Repository: nifi Updated Branches: refs/heads/master 7a451935a -> a7e76cc00
http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/non-empty-flow.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/non-empty-flow.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/non-empty-flow.xml new file mode 100644 index 0000000..edb06b4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/non-empty-flow.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>00000000-0000-0000-0000-000000000000</id> + <name>Integration Test Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + </rootGroup> +</flowController> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index fee44ca..b1577b7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,7 +16,35 @@ */ package org.apache.nifi.controller; -import com.sun.jersey.api.client.ClientHandlerException; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.net.ssl.SSLContext; + import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -206,36 +234,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static java.util.Objects.requireNonNull; +import com.sun.jersey.api.client.ClientHandlerException; public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider { @@ -363,7 +362,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final Lock writeLock = rwLock.writeLock(); private static final Logger LOG = LoggerFactory.getLogger(FlowController.class); - private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"); public static FlowController createStandaloneInstance( final FlowFileEventRepository flowFileEventRepo, @@ -3417,8 +3415,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R heartbeat(); } else { - leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE); - leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR); stateManagerProvider.disableClusterProvider(); setPrimary(false); @@ -3430,6 +3426,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + if (!clustered) { + leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE); + leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR); + } + // update the heartbeat bean this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), isPrimary())); } finally { @@ -3870,9 +3871,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } private class HeartbeatSendTask implements Runnable { - - private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US); - @Override public void run() { try (final NarCloseable narCloseable = NarCloseable.withFrameworkNar()) { @@ -3882,36 +3880,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final HeartbeatMessage message = createHeartbeatMessage(); if (message == null) { - heartbeatLogger.debug("No heartbeat to send"); + LOG.debug("No heartbeat to send"); return; } - final long sendStart = System.nanoTime(); heartbeater.send(message); - - final long sendNanos = System.nanoTime() - sendStart; - final long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos); - - String heartbeatAddress; - try { - heartbeatAddress = heartbeater.getHeartbeatAddress(); - } catch (final IOException ioe) { - heartbeatAddress = "Cluster Coordinator (could not determine socket address)"; - } - - heartbeatLogger.info("Heartbeat created at {} and sent to {} at {}; send took {} millis", - dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())), - heartbeatAddress, - dateFormatter.format(new Date()), - sendMillis); } catch (final UnknownServiceAddressException usae) { - if (heartbeatLogger.isDebugEnabled()) { - heartbeatLogger.debug(usae.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug(usae.getMessage()); } } catch (final Throwable ex) { - heartbeatLogger.warn("Failed to send heartbeat due to: " + ex); - if (heartbeatLogger.isDebugEnabled()) { - heartbeatLogger.warn("", ex); + LOG.warn("Failed to send heartbeat due to: " + ex); + if (LOG.isDebugEnabled()) { + LOG.warn("", ex); } } } @@ -3950,7 +3931,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final HeartbeatMessage message = new HeartbeatMessage(); message.setHeartbeat(heartbeat); - heartbeatLogger.debug("Generated heartbeat"); + LOG.debug("Generated heartbeat"); return message; } catch (final Throwable ex) { http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 417a994..2d35a63 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -446,7 +446,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { * and heartbeat until a manager is located. */ final boolean localFlowEmpty = StandardFlowSynchronizer.isEmpty(proposedFlow); - final ConnectionResponse response = connect(localFlowEmpty, localFlowEmpty); + final ConnectionResponse response = connect(true, localFlowEmpty, proposedFlow); // obtain write lock while we are updating the controller. We need to ensure that we don't // obtain the lock before calling connect(), though, or we will end up getting a deadlock @@ -454,7 +454,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // flow, as that requires a read lock. writeLock.lock(); try { - if (response == null) { + if (response == null || response.shouldTryLater()) { logger.info("Flow controller will load local dataflow and suspend connection handshake until a cluster connection response is received."); // load local proposed flow @@ -523,6 +523,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } clusterCoordinator.disconnectionRequestedByNode(getNodeId(), disconnectionCode, ex.toString()); controller.setClustered(false, null); + clusterCoordinator.setConnected(false); } private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) throws ProtocolException { @@ -587,9 +588,14 @@ public class StandardFlowService implements FlowService, ProtocolHandler { logger.info("Processing reconnection request from manager."); // reconnect - final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(), + ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(), request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions()); + if (connectionResponse.getDataFlow() == null) { + logger.info("Received a Reconnection Request that contained no DataFlow. Will attempt to connect to cluster using local flow."); + connectionResponse = connect(false, false, createDataFlow()); + } + loadFromConnectionResponse(connectionResponse); clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream() @@ -747,13 +753,13 @@ public class StandardFlowService implements FlowService, ProtocolHandler { return templates; } - private ConnectionResponse connect(final boolean retryOnCommsFailure, final boolean retryIndefinitely) throws ConnectionException { + private ConnectionResponse connect(final boolean retryOnCommsFailure, final boolean retryIndefinitely, final DataFlow dataFlow) throws ConnectionException { readLock.lock(); try { logger.info("Connecting Node: " + nodeId); // create connection request message - final ConnectionRequest request = new ConnectionRequest(nodeId); + final ConnectionRequest request = new ConnectionRequest(nodeId, dataFlow); final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage(); requestMsg.setConnectionRequest(request); @@ -772,19 +778,21 @@ public class StandardFlowService implements FlowService, ProtocolHandler { for (int i = 0; i < maxAttempts || retryIndefinitely; i++) { try { response = senderListener.requestConnection(requestMsg).getConnectionResponse(); - if (response.getRejectionReason() != null) { - logger.warn("Connection request was blocked by cluster coordinator with the explanation: " + response.getRejectionReason()); - // set response to null and treat a firewall blockage the same as getting no response from manager - response = null; - break; - } else if (response.shouldTryLater()) { - logger.info("Flow controller requested by cluster coordinator to retry connection in " + response.getTryLaterSeconds() + " seconds."); + + if (response.shouldTryLater()) { + logger.info("Requested by cluster coordinator to retry connection in " + response.getTryLaterSeconds() + " seconds with explanation: " + response.getRejectionReason()); try { Thread.sleep(response.getTryLaterSeconds() * 1000); } catch (final InterruptedException ie) { // we were interrupted, so finish quickly + Thread.currentThread().interrupt(); break; } + } else if (response.getRejectionReason() != null) { + logger.warn("Connection request was blocked by cluster coordinator with the explanation: " + response.getRejectionReason()); + // set response to null and treat a firewall blockage the same as getting no response from manager + response = null; + break; } else { // we received a successful connection response from manager break; @@ -824,7 +832,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // if response is null, then either we had IO problems or we were blocked by firewall or we couldn't determine manager's address return response; } else if (response.shouldTryLater()) { - // if response indicates we should try later, then manager was unable to service our request. Just load local flow and move on. + // if response indicates we should try later, then coordinator was unable to service our request. Just load local flow and move on. + // when the cluster coordinator is able to service requests, this node's heartbeat will trigger the cluster coordinator to reach + // out to this node and re-connect to the cluster. + logger.info("Received a 'try again' response from Cluster Coordinator when attempting to connect to cluster with explanation '" + + response.getRejectionReason() + "'. However, the maximum number of retries have already completed. Will load local flow and connect to the cluster when able."); return null; } else { // cluster manager provided a successful response with a current dataflow @@ -848,8 +860,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private void loadFromConnectionResponse(final ConnectionResponse response) throws ConnectionException { writeLock.lock(); try { - clusterCoordinator.resetNodeStatuses(response.getNodeConnectionStatuses().stream() + if (response.getNodeConnectionStatuses() != null) { + clusterCoordinator.resetNodeStatuses(response.getNodeConnectionStatuses().stream() .collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status))); + } // get the dataflow from the response final DataFlow dataFlow = response.getDataFlow(); http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java index e2e7f9c..50d0382 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java @@ -17,8 +17,13 @@ package org.apache.nifi.controller.cluster; import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.ClusterRoles; @@ -54,7 +59,7 @@ public class ClusterProtocolHeartbeater implements Heartbeater { } @Override - public String getHeartbeatAddress() throws IOException { + public String getHeartbeatAddress() { final String heartbeatAddress = electionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR); if (heartbeatAddress == null) { throw new ProtocolException("Cannot send heartbeat because there is no Cluster Coordinator currently elected"); @@ -65,6 +70,8 @@ public class ClusterProtocolHeartbeater implements Heartbeater { @Override public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException { + final long sendStart = System.nanoTime(); + final String heartbeatAddress = getHeartbeatAddress(); final HeartbeatResponseMessage responseMessage = protocolSender.heartbeat(heartbeatMessage, heartbeatAddress); @@ -88,6 +95,21 @@ public class ClusterProtocolHeartbeater implements Heartbeater { } } } + + + final long sendNanos = System.nanoTime() - sendStart; + final long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos); + + final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US); + final String flowElectionMessage = responseMessage.getFlowElectionMessage(); + final String formattedElectionMessage = flowElectionMessage == null ? "" : "; " + flowElectionMessage; + + logger.info("Heartbeat created at {} and sent to {} at {}; send took {} millis{}", + dateFormatter.format(new Date(heartbeatMessage.getHeartbeat().getCreatedTimestamp())), + heartbeatAddress, + dateFormatter.format(new Date()), + sendMillis, + formattedElectionMessage); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index efc7366..229617f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -137,9 +137,14 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { registeredRoles.remove(roleName); final LeaderRole leaderRole = leaderRoles.remove(roleName); + if (leaderRole == null) { + logger.info("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName); + return; + } + final LeaderSelector leaderSelector = leaderRole.getLeaderSelector(); if (leaderSelector == null) { - logger.warn("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName); + logger.info("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName); return; } http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 43cdb80..f3a4cbb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -75,7 +75,7 @@ import java.util.UUID; * and processor properties. Examples of items not involved in the fingerprint are: items in the processor "settings" or "comments" tabs, position information, flow controller settings, and counters. * */ -public final class FingerprintFactory { +public class FingerprintFactory { /* * Developer Note: This class should be changed with care and coordinated @@ -87,33 +87,47 @@ public final class FingerprintFactory { public static final String NO_VALUE = "NO_VALUE"; private static final String FLOW_CONFIG_XSD = "/FlowConfiguration.xsd"; - private static final Schema FLOW_CONFIG_SCHEMA; - private static final DocumentBuilder FLOW_CONFIG_DOC_BUILDER; private static final String ENCRYPTED_VALUE_PREFIX = "enc{"; private static final String ENCRYPTED_VALUE_SUFFIX = "}"; private final StringEncryptor encryptor; + private final DocumentBuilder flowConfigDocBuilder; private static final Logger logger = LoggerFactory.getLogger(FingerprintFactory.class); - static { + public FingerprintFactory(final StringEncryptor encryptor) { + this.encryptor = encryptor; + final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); documentBuilderFactory.setNamespaceAware(true); final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + final Schema schema; try { - FLOW_CONFIG_SCHEMA = schemaFactory.newSchema(FingerprintFactory.class.getResource(FLOW_CONFIG_XSD)); + schema = schemaFactory.newSchema(FingerprintFactory.class.getResource(FLOW_CONFIG_XSD)); } catch (final Exception e) { throw new RuntimeException("Failed to parse schema for file flow configuration.", e); } try { - documentBuilderFactory.setSchema(FLOW_CONFIG_SCHEMA); - FLOW_CONFIG_DOC_BUILDER = documentBuilderFactory.newDocumentBuilder(); + documentBuilderFactory.setSchema(schema); + flowConfigDocBuilder = documentBuilderFactory.newDocumentBuilder(); } catch (final Exception e) { throw new RuntimeException("Failed to create document builder for flow configuration.", e); } } - public FingerprintFactory(final StringEncryptor encryptor) { - this.encryptor = encryptor; + /** + * Creates a fingerprint of a flow. The order of elements or attributes in the flow does not influence the fingerprint generation. + * This method does not accept a FlowController, which means that Processors cannot be created in order to verify default property + * values, etc. As a result, if Flow A and Flow B are fingerprinted and Flow B, for instance, contains a property with a default value + * that is not present in Flow A, then the two will have different fingerprints. + * + * @param flowBytes the flow represented as bytes + * + * @return a generated fingerprint + * + * @throws FingerprintException if the fingerprint failed to be generated + */ + public synchronized String createFingerprint(final byte[] flowBytes) throws FingerprintException { + return createFingerprint(flowBytes, null); } /** @@ -126,7 +140,7 @@ public final class FingerprintFactory { * * @throws FingerprintException if the fingerprint failed to be generated */ - public String createFingerprint(final byte[] flowBytes, final FlowController controller) throws FingerprintException { + public synchronized String createFingerprint(final byte[] flowBytes, final FlowController controller) throws FingerprintException { try { return createFingerprint(parseFlow(flowBytes), controller); } catch (final NoSuchAlgorithmException e) { @@ -178,7 +192,7 @@ public final class FingerprintFactory { } try { - return FLOW_CONFIG_DOC_BUILDER.parse(new ByteArrayInputStream(flow)); + return flowConfigDocBuilder.parse(new ByteArrayInputStream(flow)); } catch (final SAXException | IOException ex) { throw new FingerprintException(ex); } http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index 1e30675..4c1c07c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -152,6 +152,8 @@ <nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout> <nifi.cluster.node.read.timeout>5 sec</nifi.cluster.node.read.timeout> <nifi.cluster.firewall.file /> + <nifi.cluster.flow.election.max.wait.time>5 mins</nifi.cluster.flow.election.max.wait.time> + <nifi.cluster.flow.election.max.candidates></nifi.cluster.flow.election.max.candidates> <nifi.cluster.request.replication.claim.timeout>15 secs</nifi.cluster.request.replication.claim.timeout> http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index bc5611e..3cbdcb0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -172,6 +172,8 @@ nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size} nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout} nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout} nifi.cluster.firewall.file=${nifi.cluster.firewall.file} +nifi.cluster.flow.election.max.wait.time=${nifi.cluster.flow.election.max.wait.time} +nifi.cluster.flow.election.max.candidates=${nifi.cluster.flow.election.max.candidates} # zookeeper properties, used for cluster management # nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string} http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 92d6b7a..02a36c1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -16,12 +16,39 @@ */ package org.apache.nifi.web.api; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.sun.jersey.api.core.HttpContext; -import com.sun.jersey.api.representation.Form; -import com.sun.jersey.core.util.MultivaluedMapImpl; -import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider; +import static javax.ws.rs.core.Response.Status.NOT_FOUND; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.CacheControl; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriBuilderException; +import javax.ws.rs.core.UriInfo; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AuthorizableLookup; import org.apache.nifi.authorization.AuthorizeAccess; @@ -34,8 +61,10 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.Snippet; import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.VersionNegotiator; @@ -58,37 +87,12 @@ import org.apache.nifi.web.security.util.CacheKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.core.CacheControl; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.ResponseBuilder; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.UriBuilderException; -import javax.ws.rs.core.UriInfo; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; - -import static javax.ws.rs.core.Response.Status.NOT_FOUND; -import static org.apache.commons.lang3.StringUtils.isEmpty; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.sun.jersey.api.core.HttpContext; +import com.sun.jersey.api.representation.Form; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider; /** * Base class for controllers. @@ -120,6 +124,7 @@ public abstract class ApplicationResource { protected NiFiProperties properties; private RequestReplicator requestReplicator; private ClusterCoordinator clusterCoordinator; + private FlowController flowController; private static final int MAX_CACHE_SOFT_LIMIT = 500; private final Cache<CacheKey, Request<? extends Entity>> twoPhaseCommitCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build(); @@ -394,6 +399,8 @@ public abstract class ApplicationResource { return false; } + ensureFlowInitialized(); + // If not connected to the cluster, we do not replicate if (!isConnectedToCluster()) { return false; @@ -753,6 +760,12 @@ public abstract class ApplicationResource { return replicate(method, entity, nodeUuid, null); } + private void ensureFlowInitialized() { + if (!flowController.isInitialized()) { + throw new IllegalClusterStateException("Cluster is still in the process of voting on the appropriate Data Flow."); + } + } + /** * Replicates the request to the given node * @@ -773,6 +786,8 @@ public abstract class ApplicationResource { throw new UnknownNodeException("Cannot replicate request " + method + " " + getAbsolutePath() + " to node with ID " + nodeUuid + " because the specified node does not exist."); } + ensureFlowInitialized(); + final URI path = getAbsolutePath(); try { final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride); @@ -812,6 +827,8 @@ public abstract class ApplicationResource { } protected Response replicate(final String method, final NodeIdentifier targetNode, final Object entity) { + ensureFlowInitialized(); + try { // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly // to the cluster nodes themselves. @@ -828,6 +845,8 @@ public abstract class ApplicationResource { } protected Response replicateToCoordinator(final String method, final Object entity) { + ensureFlowInitialized(); + try { final NodeIdentifier coordinatorNode = getClusterCoordinatorNode(); final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode); @@ -906,6 +925,8 @@ public abstract class ApplicationResource { * @see #replicate(String, Object, Map) */ protected NodeResponse replicateNodeResponse(final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException { + ensureFlowInitialized(); + final URI path = getAbsolutePath(); final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride); @@ -935,6 +956,8 @@ public abstract class ApplicationResource { } protected RequestReplicator getRequestReplicator() { + ensureFlowInitialized(); + return requestReplicator; } @@ -950,6 +973,10 @@ public abstract class ApplicationResource { return clusterCoordinator; } + public void setFlowController(final FlowController flowController) { + this.flowController = flowController; + } + protected NiFiProperties getProperties() { return properties; } http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml index 2f3fb29..6982ea3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml @@ -197,6 +197,7 @@ <property name="properties" ref="nifiProperties"/> <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> + <property name="flowController" ref="flowController" /> </bean> <bean id="resourceResource" class="org.apache.nifi.web.api.ResourceResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -204,6 +205,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="controllerResource" class="org.apache.nifi.web.api.ControllerResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -213,6 +215,7 @@ <property name="reportingTaskResource" ref="reportingTaskResource"/> <property name="controllerServiceResource" ref="controllerServiceResource"/> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="siteToSiteResource" class="org.apache.nifi.web.api.SiteToSiteResource" scope="singleton"> <constructor-arg ref="nifiProperties"/> @@ -221,6 +224,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="dataTransferResource" class="org.apache.nifi.web.api.DataTransferResource" scope="singleton"> <constructor-arg ref="nifiProperties"/> @@ -229,6 +233,7 @@ <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> <property name="serviceFacade" ref="serviceFacade"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="snippetResource" class="org.apache.nifi.web.api.SnippetResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -236,6 +241,7 @@ <property name="requestReplicator" ref="requestReplicator" /> <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="templateResource" class="org.apache.nifi.web.api.TemplateResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -243,6 +249,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="controllerServiceResource" class="org.apache.nifi.web.api.ControllerServiceResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -250,6 +257,7 @@ <property name="requestReplicator" ref="requestReplicator"/> <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="reportingTaskResource" class="org.apache.nifi.web.api.ReportingTaskResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -257,6 +265,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="processGroupResource" class="org.apache.nifi.web.api.ProcessGroupResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -273,6 +282,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="processorResource" class="org.apache.nifi.web.api.ProcessorResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -280,6 +290,7 @@ <property name="requestReplicator" ref="requestReplicator" /> <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="connectionResource" class="org.apache.nifi.web.api.ConnectionResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -287,6 +298,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="flowfileQueueResource" class="org.apache.nifi.web.api.FlowFileQueueResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -294,6 +306,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="remoteProcessGroupResource" class="org.apache.nifi.web.api.RemoteProcessGroupResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -301,6 +314,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="inputPortResource" class="org.apache.nifi.web.api.InputPortResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -308,6 +322,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="outputPortResource" class="org.apache.nifi.web.api.OutputPortResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -315,6 +330,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="labelResource" class="org.apache.nifi.web.api.LabelResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -322,6 +338,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="funnelResource" class="org.apache.nifi.web.api.FunnelResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -329,6 +346,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer" /> + <property name="flowController" ref="flowController" /> </bean> <bean id="provenanceResource" class="org.apache.nifi.web.api.ProvenanceResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -336,12 +354,14 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="provenanceEventResource" class="org.apache.nifi.web.api.ProvenanceEventResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> <property name="properties" ref="nifiProperties"/> <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> + <property name="flowController" ref="flowController" /> </bean> <bean id="countersResource" class="org.apache.nifi.web.api.CountersResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -349,6 +369,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="systemDiagnosticsResource" class="org.apache.nifi.web.api.SystemDiagnosticsResource" scope="singleton"> <property name="serviceFacade" ref="serviceFacade"/> @@ -356,6 +377,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> <property name="authorizer" ref="authorizer"/> + <property name="flowController" ref="flowController" /> </bean> <bean id="accessResource" class="org.apache.nifi.web.api.AccessResource" scope="singleton"> <property name="loginIdentityProvider" ref="loginIdentityProvider"/> @@ -369,6 +391,7 @@ <property name="properties" ref="nifiProperties"/> <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="requestReplicator" ref="requestReplicator" /> + <property name="flowController" ref="flowController" /> </bean> <bean id="accessPolicyResource" class="org.apache.nifi.web.api.AccessPolicyResource" scope="singleton"> <constructor-arg ref="serviceFacade"/> http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js index f1b1b5e..96a3d68 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js @@ -407,6 +407,8 @@ nf.Common = (function () { $('#message-title').text('Unauthorized'); } else if (xhr.status === 403) { $('#message-title').text('Access Denied'); + } else if (xhr.status === 409) { + $('#message-title').text('Invalid State'); } else { $('#message-title').text('An unexpected error has occurred'); }
