http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java index fc75601..81e3e4c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java @@ -17,6 +17,8 @@ package org.apache.nifi.cluster.manager.impl; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -25,6 +27,7 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.flow.DataFlowManagementService; import org.apache.nifi.cluster.node.Node; import org.apache.nifi.cluster.node.Node.Status; import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; @@ -41,10 +44,12 @@ public class WebClusterManagerCoordinator implements ClusterCoordinator { private final WebClusterManager clusterManager; private final ClusterManagerProtocolSender protocolSender; + private final DataFlowManagementService dfmService; - public WebClusterManagerCoordinator(final WebClusterManager clusterManager, final ClusterManagerProtocolSender protocolSender) { + public WebClusterManagerCoordinator(final WebClusterManager clusterManager, final ClusterManagerProtocolSender protocolSender, final DataFlowManagementService dfmService) { this.clusterManager = clusterManager; this.protocolSender = protocolSender; + this.dfmService = dfmService; } @Override @@ -196,6 +201,8 @@ public class WebClusterManagerCoordinator implements ClusterCoordinator { protocolSender.notifyNodeStatusChange(nodesToNotify, message); } + dfmService.setNodeIds(getNodeIdentifiers(NodeConnectionState.CONNECTED)); + return true; } @@ -241,6 +248,17 @@ public class WebClusterManagerCoordinator implements ClusterCoordinator { message.setStatusUpdateIdentifier(statusUpdateId); protocolSender.notifyNodeStatusChange(nodesToNotify, message); + dfmService.setNodeIds(getNodeIdentifiers(NodeConnectionState.CONNECTED)); } } + + @Override + public Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates() { + final Set<Node> nodes = clusterManager.getNodes(); + final Map<NodeConnectionState, List<NodeIdentifier>> connectionStatusMap = nodes.stream() + .map(node -> node.getNodeId()) + .collect(Collectors.groupingBy(nodeId -> getConnectionStatus(nodeId).getState())); + + return connectionStatusMap; + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java index 2b3bff9..a06bdaf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java @@ -20,8 +20,6 @@ import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.cluster.event.EventManager; import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; import org.apache.nifi.cluster.flow.DataFlowManagementService; -import org.apache.nifi.cluster.manager.HttpRequestReplicator; -import org.apache.nifi.cluster.manager.HttpResponseMapper; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener; import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster; @@ -61,15 +59,11 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon */ return null; } else if (clusterManager == null) { - final HttpRequestReplicator requestReplicator = applicationContext.getBean("httpRequestReplicator", HttpRequestReplicator.class); - final HttpResponseMapper responseMapper = applicationContext.getBean("httpResponseMapper", HttpResponseMapper.class); final DataFlowManagementService dataFlowService = applicationContext.getBean("dataFlowManagementService", DataFlowManagementService.class); final ClusterManagerProtocolSenderListener senderListener = applicationContext.getBean("clusterManagerProtocolSenderListener", ClusterManagerProtocolSenderListener.class); // create the manager clusterManager = new WebClusterManager( - requestReplicator, - responseMapper, dataFlowService, senderListener, properties, http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml index 72c7bff..1de15cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml @@ -25,38 +25,6 @@ http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd"> - <!-- jersey client --> - <bean id="jersey-client" class="org.apache.nifi.web.util.WebUtils" factory-method="createClient"> - <constructor-arg> - <bean class="com.sun.jersey.api.client.config.DefaultClientConfig"/> - </constructor-arg> - <constructor-arg> - <bean class="org.apache.nifi.framework.security.util.SslContextFactory" factory-method="createSslContext"> - <constructor-arg ref="nifiProperties"/> - </bean> - </constructor-arg> - </bean> - - <!-- http request replicator --> - <bean id="httpRequestReplicator" class="org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl"> - <constructor-arg index="0"> - <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiRequestThreads"/> - </constructor-arg> - <constructor-arg ref="jersey-client" index="1"/> - <constructor-arg index="2"> - <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiConnectionTimeout"/> - </constructor-arg> - <constructor-arg index="3"> - <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiReadTimeout"/> - </constructor-arg> - <property name="nodeProtocolScheme"> - <bean factory-bean="nifiProperties" factory-method="getClusterProtocolManagerToNodeApiScheme"/> - </property> - </bean> - - <!-- http response mapper --> - <bean id="httpResponseMapper" class="org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl"/> - <!-- cluster flow DAO --> <bean id="dataFlowDao" class="org.apache.nifi.cluster.flow.impl.DataFlowDaoImpl"> <constructor-arg index="0"> http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index e218e05..7d62b54 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -283,6 +283,11 @@ public class TestAbstractHeartbeatMonitor { public NodeIdentifier getNodeIdentifier(final String uuid) { return statuses.keySet().stream().filter(p -> p.getId().equals(uuid)).findFirst().orElse(null); } + + @Override + public Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates() { + return statuses.keySet().stream().collect(Collectors.groupingBy(nodeId -> getConnectionStatus(nodeId).getState())); + } } public static class ReportedEvent { http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestCuratorHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestCuratorHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestCuratorHeartbeatMonitor.java new file mode 100644 index 0000000..3652595 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestCuratorHeartbeatMonitor.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.heartbeat; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.curator.test.TestingServer; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.DisconnectionCode; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.NiFiProperties; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestCuratorHeartbeatMonitor { + private TestingServer zkServer; + private NodeIdentifier nodeId; + private TestFriendlyHeartbeatMonitor monitor; + + @Before + public void setup() throws Exception { + zkServer = new TestingServer(true); + zkServer.start(); + nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", null, false); + } + + @After + public void clear() throws IOException { + if (zkServer != null) { + zkServer.stop(); + zkServer.close(); + } + + if (monitor != null) { + monitor.stop(); + } + } + + /** + * Verifies that a node that sends a heartbeat that indicates that it is 'connected' is asked to connect to + * cluster if the cluster coordinator does not know about the node + * + * @throws InterruptedException if interrupted + */ + @Test + public void testNewConnectedHeartbeatFromUnknownNode() throws IOException, InterruptedException { + final List<NodeIdentifier> requestedToConnect = Collections.synchronizedList(new ArrayList<>()); + final ClusterCoordinatorAdapter coordinator = new ClusterCoordinatorAdapter() { + @Override + public synchronized void requestNodeConnect(final NodeIdentifier nodeId) { + requestedToConnect.add(nodeId); + } + }; + + final TestFriendlyHeartbeatMonitor monitor = createMonitor(coordinator); + + // Ensure that we request the Unknown Node connect to the cluster + final NodeHeartbeat heartbeat = createHeartbeat(nodeId, NodeConnectionState.CONNECTED); + monitor.addHeartbeat(heartbeat); + monitor.waitForProcessed(); + + assertEquals(1, requestedToConnect.size()); + assertEquals(nodeId, requestedToConnect.get(0)); + assertEquals(1, coordinator.getEvents().size()); + } + + /** + * Verifies that a node that sends a heartbeat that indicates that it is 'connected' if previously + * manually disconnected, will be asked to disconnect from the cluster again. + * + * @throws InterruptedException if interrupted + */ + @Test + public void testHeartbeatFromManuallyDisconnectedNode() throws InterruptedException { + final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>()); + final Set<NodeIdentifier> requestedToDisconnect = Collections.synchronizedSet(new HashSet<>()); + final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() { + @Override + public synchronized void requestNodeConnect(final NodeIdentifier nodeId) { + super.requestNodeConnect(nodeId); + requestedToConnect.add(nodeId); + } + + @Override + public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { + super.requestNodeDisconnect(nodeId, disconnectionCode, explanation); + requestedToDisconnect.add(nodeId); + } + }; + + final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter); + + adapter.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, "Unit Testing"); + monitor.addHeartbeat(createHeartbeat(nodeId, NodeConnectionState.CONNECTED)); + monitor.waitForProcessed(); + + assertEquals(1, requestedToDisconnect.size()); + assertEquals(nodeId, requestedToDisconnect.iterator().next()); + assertTrue(requestedToConnect.isEmpty()); + } + + + @Test + public void testConnectingNodeMarkedConnectedWhenHeartbeatReceived() throws InterruptedException { + final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>()); + final Set<NodeIdentifier> connected = Collections.synchronizedSet(new HashSet<>()); + final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() { + @Override + public synchronized void requestNodeConnect(final NodeIdentifier nodeId) { + super.requestNodeConnect(nodeId); + requestedToConnect.add(nodeId); + } + + @Override + public synchronized void finishNodeConnection(final NodeIdentifier nodeId) { + super.finishNodeConnection(nodeId); + connected.add(nodeId); + } + }; + + final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter); + + adapter.requestNodeConnect(nodeId); // set state to 'connecting' + requestedToConnect.clear(); + + monitor.addHeartbeat(createHeartbeat(nodeId, NodeConnectionState.CONNECTED)); + monitor.waitForProcessed(); + + assertEquals(1, connected.size()); + assertEquals(nodeId, connected.iterator().next()); + assertTrue(requestedToConnect.isEmpty()); + } + + + @Test + public void testDisconnectedHeartbeatOnStartup() throws InterruptedException { + final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>()); + final Set<NodeIdentifier> connected = Collections.synchronizedSet(new HashSet<>()); + final Set<NodeIdentifier> disconnected = Collections.synchronizedSet(new HashSet<>()); + final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() { + @Override + public synchronized void requestNodeConnect(final NodeIdentifier nodeId) { + super.requestNodeConnect(nodeId); + requestedToConnect.add(nodeId); + } + + @Override + public synchronized void finishNodeConnection(final NodeIdentifier nodeId) { + super.finishNodeConnection(nodeId); + connected.add(nodeId); + } + + @Override + public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { + super.requestNodeDisconnect(nodeId, disconnectionCode, explanation); + disconnected.add(nodeId); + } + }; + + final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter); + + requestedToConnect.clear(); + + monitor.addHeartbeat(createHeartbeat(nodeId, DisconnectionCode.NODE_SHUTDOWN)); + monitor.waitForProcessed(); + + assertTrue(connected.isEmpty()); + assertTrue(requestedToConnect.isEmpty()); + assertTrue(disconnected.isEmpty()); + } + + private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode) { + final NodeConnectionStatus status = new NodeConnectionStatus(disconnectionCode); + return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, false, 0, 0, 0, 0); + } + + private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) { + final NodeConnectionStatus status = new NodeConnectionStatus(state); + return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, false, 0, 0, 0, 0); + } + + private TestFriendlyHeartbeatMonitor createMonitor(final ClusterCoordinator coordinator) { + monitor = new TestFriendlyHeartbeatMonitor(coordinator, createProperties()); + monitor.start(); + return monitor; + } + + private Properties createProperties() { + final Properties properties = new Properties(); + properties.setProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, "10 ms"); + properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, zkServer.getConnectString()); + properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, "3 secs"); + properties.setProperty(NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, "3 secs"); + properties.setProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, "/nifi"); + return properties; + } + + private static class ClusterCoordinatorAdapter implements ClusterCoordinator { + private final Map<NodeIdentifier, NodeConnectionStatus> statuses = new HashMap<>(); + private final List<ReportedEvent> events = new ArrayList<>(); + + @Override + public synchronized void requestNodeConnect(NodeIdentifier nodeId) { + statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.CONNECTING)); + } + + @Override + public synchronized void finishNodeConnection(NodeIdentifier nodeId) { + statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.CONNECTED)); + } + + @Override + public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) { + statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.DISCONNECTED)); + } + + @Override + public synchronized void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) { + statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.DISCONNECTED)); + } + + @Override + public synchronized NodeConnectionStatus getConnectionStatus(NodeIdentifier nodeId) { + return statuses.get(nodeId); + } + + @Override + public synchronized Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState state) { + return statuses.entrySet().stream().filter(p -> p.getValue().getState() == state).map(p -> p.getKey()).collect(Collectors.toSet()); + } + + @Override + public synchronized boolean isBlockedByFirewall(String hostname) { + return false; + } + + @Override + public synchronized void reportEvent(NodeIdentifier nodeId, Severity severity, String event) { + events.add(new ReportedEvent(nodeId, severity, event)); + } + + @Override + public synchronized void setPrimaryNode(NodeIdentifier nodeId) { + } + + synchronized List<ReportedEvent> getEvents() { + return new ArrayList<>(events); + } + + @Override + public NodeIdentifier getNodeIdentifier(final String uuid) { + return statuses.keySet().stream().filter(p -> p.getId().equals(uuid)).findFirst().orElse(null); + } + + @Override + public Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates() { + return null; + } + } + + public static class ReportedEvent { + private final NodeIdentifier nodeId; + private final Severity severity; + private final String event; + + public ReportedEvent(NodeIdentifier nodeId, Severity severity, String event) { + this.nodeId = nodeId; + this.severity = severity; + this.event = event; + } + + public NodeIdentifier getNodeId() { + return nodeId; + } + + public Severity getSeverity() { + return severity; + } + + public String getEvent() { + return event; + } + } + + + private static class TestFriendlyHeartbeatMonitor extends CuratorHeartbeatMonitor { + private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new HashMap<>(); + private final Object mutex = new Object(); + + public TestFriendlyHeartbeatMonitor(ClusterCoordinator clusterCoordinator, Properties properties) { + super(clusterCoordinator, properties); + } + + @Override + synchronized Map<NodeIdentifier, NodeHeartbeat> fetchHeartbeats() { + return heartbeats; + } + + @Override + synchronized void monitorHeartbeats() { + super.monitorHeartbeats(); + + synchronized (mutex) { + mutex.notify(); + } + } + + synchronized void addHeartbeat(final NodeHeartbeat heartbeat) { + heartbeats.put(heartbeat.getNodeIdentifier(), heartbeat); + } + + @Override + public synchronized void removeHeartbeat(final NodeIdentifier nodeId) { + heartbeats.remove(nodeId); + } + + void waitForProcessed() throws InterruptedException { + synchronized (mutex) { + mutex.wait(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java new file mode 100644 index 0000000..f4d5d2e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.junit.Test; + +public class TestProcessorEndpointMerger { + + @Test + public void testMergeValidationErrors() { + final ProcessorEndpointMerger merger = new ProcessorEndpointMerger(); + final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); + + final NodeIdentifier nodeId1234 = new NodeIdentifier("1234", "localhost", 9000, "localhost", 9001, "localhost", 9002, false); + final List<String> nodeValidationErrors1234 = new ArrayList<>(); + nodeValidationErrors1234.add("error 1"); + nodeValidationErrors1234.add("error 2"); + + merger.mergeValidationErrors(validationErrorMap, nodeId1234, nodeValidationErrors1234); + + final NodeIdentifier nodeXyz = new NodeIdentifier("xyz", "localhost", 8000, "localhost", 8001, "localhost", 8002, false); + final List<String> nodeValidationErrorsXyz = new ArrayList<>(); + nodeValidationErrorsXyz.add("error 1"); + + merger.mergeValidationErrors(validationErrorMap, nodeXyz, nodeValidationErrorsXyz); + + assertEquals(2, validationErrorMap.size()); + + final Set<NodeIdentifier> idsError1 = validationErrorMap.get("error 1"); + assertEquals(2, idsError1.size()); + assertTrue(idsError1.contains(nodeId1234)); + assertTrue(idsError1.contains(nodeXyz)); + + final Set<NodeIdentifier> idsError2 = validationErrorMap.get("error 2"); + assertEquals(1, idsError2.size()); + assertTrue(idsError2.contains(nodeId1234)); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java new file mode 100644 index 0000000..dba973c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestStatusHistoryEndpointMerger.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import static org.junit.Assert.assertEquals; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; + +import org.junit.Test; + +public class TestStatusHistoryEndpointMerger { + @Test + public void testNormalizedStatusSnapshotDate() throws ParseException { + final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:SS.SSS", Locale.US); + final Date date1 = df.parse("2014/01/01 00:00:00.000"); + final Date date2 = df.parse("2014/01/01 00:04:59.999"); + final Date date3 = df.parse("2014/01/01 00:05:00.000"); + final Date date4 = df.parse("2014/01/01 00:05:00.001"); + + final Date normalized1 = StatusHistoryEndpointMerger.normalizeStatusSnapshotDate(date1, 300000); + assertEquals(date1, normalized1); + + final Date normalized2 = StatusHistoryEndpointMerger.normalizeStatusSnapshotDate(date2, 300000); + assertEquals(date1, normalized2); + + final Date normalized3 = StatusHistoryEndpointMerger.normalizeStatusSnapshotDate(date3, 300000); + assertEquals(date3, normalized3); + + final Date normalized4 = StatusHistoryEndpointMerger.normalizeStatusSnapshotDate(date4, 300000); + assertEquals(date3, normalized4); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java new file mode 100644 index 0000000..c97db03 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.junit.Test; +import org.mockito.Mockito; + +import com.sun.jersey.api.client.ClientResponse; + +public class TestResponseUtils { + + @Test + public void testFindLongResponseTimes() throws URISyntaxException { + final Map<NodeIdentifier, NodeResponse> responses = new HashMap<>(); + final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, false); + final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, false); + final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 8300, "localhost", 8301, "localhost", 8302, false); + final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 8400, "localhost", 8401, "localhost", 8402, false); + + final URI uri = new URI("localhost:8080"); + final ClientResponse clientResponse = Mockito.mock(ClientResponse.class); + responses.put(id1, new NodeResponse(id1, "GET", uri, clientResponse, TimeUnit.MILLISECONDS.toNanos(80), "1")); + responses.put(id2, new NodeResponse(id1, "GET", uri, clientResponse, TimeUnit.MILLISECONDS.toNanos(92), "1")); + responses.put(id3, new NodeResponse(id1, "GET", uri, clientResponse, TimeUnit.MILLISECONDS.toNanos(3), "1")); + responses.put(id4, new NodeResponse(id1, "GET", uri, clientResponse, TimeUnit.MILLISECONDS.toNanos(120), "1")); + + final AsyncClusterResponse response = new AsyncClusterResponse() { + @Override + public String getRequestIdentifier() { + return "1"; + } + + @Override + public String getMethod() { + return "GET"; + } + + @Override + public String getURIPath() { + return null; + } + + @Override + public Set<NodeIdentifier> getNodesInvolved() { + return new HashSet<>(responses.keySet()); + } + + @Override + public Set<NodeIdentifier> getCompletedNodeIdentifiers() { + return getNodesInvolved(); + } + + @Override + public boolean isComplete() { + return true; + } + + @Override + public boolean isOlderThan(long time, TimeUnit timeUnit) { + return true; + } + + @Override + public NodeResponse getMergedResponse() { + return null; + } + + @Override + public NodeResponse awaitMergedResponse() throws InterruptedException { + return null; + } + + @Override + public NodeResponse awaitMergedResponse(long timeout, TimeUnit timeUnit) throws InterruptedException { + return null; + } + + @Override + public NodeResponse getNodeResponse(NodeIdentifier nodeId) { + return responses.get(nodeId); + } + + @Override + public Set<NodeResponse> getCompletedNodeResponses() { + return new HashSet<>(responses.values()); + } + }; + + Set<NodeIdentifier> slowResponses = ResponseUtils.findLongResponseTimes(response, 1.5D); + assertTrue(slowResponses.isEmpty()); + + responses.put(id4, new NodeResponse(id1, "GET", uri, clientResponse, TimeUnit.MILLISECONDS.toNanos(2500), "1")); + slowResponses = ResponseUtils.findLongResponseTimes(response, 1.5D); + assertEquals(1, slowResponses.size()); + assertEquals(id4, slowResponses.iterator().next()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java new file mode 100644 index 0000000..6d3571b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.ws.rs.HttpMethod; + +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.flow.DataFlowManagementService; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.StandardOptimisticLockingManager; +import org.apache.nifi.web.api.entity.Entity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.header.InBoundHeaders; +import com.sun.jersey.core.header.OutBoundHeaders; + +public class TestThreadPoolRequestReplicator { + + @BeforeClass + public static void setupClass() { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties"); + } + + /** + * If we replicate a request, whenever we obtain the merged response from the AsyncClusterResponse object, + * the response should no longer be available and should be cleared from internal state. This test is to + * verify that this behavior occurs. + */ + @Test + public void testResponseRemovedWhenCompletedAndFetched() { + withReplicator(replicator -> { + final Set<NodeIdentifier> nodeIds = new HashSet<>(); + nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, false)); + final URI uri = new URI("http://localhost:8080/processors/1"); + final Entity entity = new ProcessorEntity(); + + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>()); + + // We should get back the same response object + assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); + + assertEquals(HttpMethod.GET, response.getMethod()); + assertEquals(nodeIds, response.getNodesInvolved()); + + assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); + + final NodeResponse nodeResponse = response.awaitMergedResponse(3, TimeUnit.SECONDS); + assertEquals(8000, nodeResponse.getNodeId().getApiPort()); + assertEquals(ClientResponse.Status.OK.getStatusCode(), nodeResponse.getStatus()); + + assertNull(replicator.getClusterResponse(response.getRequestIdentifier())); + }); + } + + + @Test(timeout = 15000) + public void testLongWaitForResponse() { + withReplicator(replicator -> { + final Set<NodeIdentifier> nodeIds = new HashSet<>(); + final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, false); + nodeIds.add(nodeId); + final URI uri = new URI("http://localhost:8080/processors/1"); + final Entity entity = new ProcessorEntity(); + + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>()); + + // We should get back the same response object + assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); + assertFalse(response.isComplete()); + + final NodeResponse nodeResponse = response.getNodeResponse(nodeId); + assertNull(nodeResponse); + + final NodeResponse completedNodeResponse = response.awaitMergedResponse(2, TimeUnit.SECONDS); + assertNotNull(completedNodeResponse); + assertNotNull(completedNodeResponse.getThrowable()); + assertEquals(500, completedNodeResponse.getStatus()); + + assertTrue(response.isComplete()); + assertNotNull(response.getMergedResponse()); + assertNull(replicator.getClusterResponse(response.getRequestIdentifier())); + } , Status.OK, 1000, new ClientHandlerException(new SocketTimeoutException())); + } + + @Test(timeout = 15000) + public void testCompleteOnError() { + withReplicator(replicator -> { + final Set<NodeIdentifier> nodeIds = new HashSet<>(); + final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, false); + final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, false); + final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 8300, "localhost", 8301, "localhost", 8302, false); + final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 8400, "localhost", 8401, "localhost", 8402, false); + nodeIds.add(id1); + nodeIds.add(id2); + nodeIds.add(id3); + nodeIds.add(id4); + + final URI uri = new URI("http://localhost:8080/processors/1"); + final Entity entity = new ProcessorEntity(); + + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>()); + assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS)); + } , null, 0L, new IllegalArgumentException("Exception created for unit test")); + } + + + @Test(timeout = 15000) + public void testMultipleRequestWithTwoPhaseCommit() { + final Set<NodeIdentifier> nodeIds = new HashSet<>(); + nodeIds.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, false)); + + final ClusterCoordinator coordinator = Mockito.mock(ClusterCoordinator.class); + + final AtomicInteger requestCount = new AtomicInteger(0); + final DataFlowManagementService dfmService = Mockito.mock(DataFlowManagementService.class); + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, + "1 sec", "1 sec", null, null, null, new StandardOptimisticLockingManager(), dfmService) { + @Override + protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { + // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. + final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata"); + final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.NCM_EXPECTS_HTTP_HEADER); + + final int statusCode; + if (requestCount.incrementAndGet() == 1) { + assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader); + statusCode = 150; + } else { + assertNull(expectsHeader); + statusCode = Status.OK.getStatusCode(); + } + + // Return given response from all nodes. + final ClientResponse clientResponse = new ClientResponse(statusCode, new InBoundHeaders(), new ByteArrayInputStream(new byte[0]), null); + return new NodeResponse(nodeId, method, uri, clientResponse, -1L, requestId); + } + }; + + replicator.start(); + + try { + final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST, + new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>()); + clusterResponse.awaitMergedResponse(); + + // Ensure that we received two requests - the first should contain the X-NcmExpects header; the second should not. + // These assertions are validated above, in the overridden replicateRequest method. + assertEquals(2, requestCount.get()); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } finally { + replicator.stop(); + } + } + + + @Test(timeout = 15000) + public void testOneNodeRejectsTwoPhaseCommit() { + final Set<NodeIdentifier> nodeIds = new HashSet<>(); + nodeIds.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, false)); + nodeIds.add(new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, false)); + + final ClusterCoordinator coordinator = Mockito.mock(ClusterCoordinator.class); + + final AtomicInteger requestCount = new AtomicInteger(0); + final DataFlowManagementService dfmService = Mockito.mock(DataFlowManagementService.class); + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, + "1 sec", "1 sec", null, null, null, new StandardOptimisticLockingManager(), dfmService) { + @Override + protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { + // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. + final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata"); + final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.NCM_EXPECTS_HTTP_HEADER); + + final int requestIndex = requestCount.incrementAndGet(); + assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader); + + if (requestIndex == 1) { + final ClientResponse clientResponse = new ClientResponse(150, new InBoundHeaders(), new ByteArrayInputStream(new byte[0]), null); + return new NodeResponse(nodeId, method, uri, clientResponse, -1L, requestId); + } else { + final IllegalClusterStateException explanation = new IllegalClusterStateException("Intentional Exception for Unit Testing"); + return new NodeResponse(nodeId, method, uri, explanation); + } + } + }; + + replicator.start(); + + try { + final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST, + new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>()); + clusterResponse.awaitMergedResponse(); + + Assert.fail("Expected to get an IllegalClusterStateException but did not"); + } catch (final IllegalClusterStateException e) { + // Expected + } catch (final Exception e) { + Assert.fail(e.toString()); + } finally { + replicator.stop(); + } + } + + + + private void withReplicator(final WithReplicator function) { + withReplicator(function, ClientResponse.Status.OK, 0L, null); + } + + private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure) { + final ClusterCoordinator coordinator = Mockito.mock(ClusterCoordinator.class); + + final DataFlowManagementService dfmService = Mockito.mock(DataFlowManagementService.class); + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, + "1 sec", "1 sec", null, null, null, new StandardOptimisticLockingManager(), dfmService) { + @Override + protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { + if (delayMillis > 0L) { + try { + Thread.sleep(delayMillis); + } catch (InterruptedException e) { + Assert.fail("Thread Interrupted durating test"); + } + } + + if (failure != null) { + throw failure; + } + + // Return given response from all nodes. + final ClientResponse clientResponse = new ClientResponse(status, new InBoundHeaders(), new ByteArrayInputStream(new byte[0]), null); + return new NodeResponse(nodeId, method, uri, clientResponse, -1L, requestId); + } + }; + + replicator.start(); + + try { + function.withReplicator(replicator); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } finally { + replicator.stop(); + } + } + + private interface WithReplicator { + void withReplicator(ThreadPoolRequestReplicator replicator) throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java index e3e9ac6..6b93ef3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java @@ -44,6 +44,7 @@ import org.apache.nifi.cluster.manager.testutils.HttpResponse; import org.apache.nifi.cluster.manager.testutils.HttpResponseAction; import org.apache.nifi.cluster.manager.testutils.HttpServer; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.util.NiFiProperties; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -69,6 +70,7 @@ public class HttpRequestReplicatorImplTest { @Before public void setUp() throws IOException, URISyntaxException { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, getClass().getResource("/conf/nifi.properties").getFile()); executorThreadCount = 5; serverThreadCount = 3; http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java deleted file mode 100644 index 55c6c31..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.manager.impl; - -import static org.junit.Assert.assertEquals; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Locale; - -import org.junit.Test; - -public class TestWebClusterManager { - - @Test - public void testNormalizedStatusSnapshotDate() throws ParseException { - final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:SS.SSS", Locale.US); - final Date date1 = df.parse("2014/01/01 00:00:00.000"); - final Date date2 = df.parse("2014/01/01 00:04:59.999"); - final Date date3 = df.parse("2014/01/01 00:05:00.000"); - final Date date4 = df.parse("2014/01/01 00:05:00.001"); - - final Date normalized1 = WebClusterManager.normalizeStatusSnapshotDate(date1, 300000); - assertEquals(date1, normalized1); - - final Date normalized2 = WebClusterManager.normalizeStatusSnapshotDate(date2, 300000); - assertEquals(date1, normalized2); - - final Date normalized3 = WebClusterManager.normalizeStatusSnapshotDate(date3, 300000); - assertEquals(date3, normalized3); - - final Date normalized4 = WebClusterManager.normalizeStatusSnapshotDate(date4, 300000); - assertEquals(date3, normalized4); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties new file mode 100644 index 0000000..78a649b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Core Properties # +nifi.version=nifi-test 3.0.0 +nifi.flow.configuration.file=./target/flow.xml.gz +nifi.flow.configuration.archive.dir=./target/archive/ +nifi.flowcontroller.autoResumeState=true +nifi.flowcontroller.graceful.shutdown.period=10 sec +nifi.flowservice.writedelay.interval=2 sec +nifi.administrative.yield.duration=30 sec + +nifi.reporting.task.configuration.file=./target/reporting-tasks.xml +nifi.controller.service.configuration.file=./target/controller-services.xml +nifi.templates.directory=./target/templates +nifi.ui.banner.text=UI Banner Text +nifi.ui.autorefresh.interval=30 sec +nifi.nar.library.directory=./target/lib +nifi.nar.working.directory=./target/work/nar/ + +# H2 Settings +nifi.database.directory=./database_repository +nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE + +# FlowFile Repository +nifi.flowfile.repository.directory=./target/test-repo +nifi.flowfile.repository.partitions=1 +nifi.flowfile.repository.checkpoint.interval=2 mins +nifi.queue.swap.threshold=20000 +nifi.swap.storage.directory=./target/test-repo/swap +nifi.swap.in.period=5 sec +nifi.swap.in.threads=1 +nifi.swap.out.period=5 sec +nifi.swap.out.threads=4 + +# Content Repository +nifi.content.claim.max.appendable.size=10 MB +nifi.content.claim.max.flow.files=100 +nifi.content.repository.directory.default=./target/content_repository + +# Provenance Repository Properties +nifi.provenance.repository.storage.directory=./target/provenance_repository +nifi.provenance.repository.max.storage.time=24 hours +nifi.provenance.repository.max.storage.size=1 GB +nifi.provenance.repository.rollover.time=30 secs +nifi.provenance.repository.rollover.size=100 MB + +# Site to Site properties +nifi.remote.input.socket.port=9990 +nifi.remote.input.secure=true + +# web properties # +nifi.web.war.directory=./target/lib +nifi.web.http.host= +nifi.web.http.port=8080 +nifi.web.https.host= +nifi.web.https.port= +nifi.web.jetty.working.directory=./target/work/jetty + +# security properties # +nifi.sensitive.props.key=key +nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL +nifi.sensitive.props.provider=BC + +nifi.security.keystore= +nifi.security.keystoreType= +nifi.security.keystorePasswd= +nifi.security.keyPasswd= +nifi.security.truststore= +nifi.security.truststoreType= +nifi.security.truststorePasswd= +nifi.security.needClientAuth= +nifi.security.authorizedUsers.file=./target/conf/authorized-users.xml +nifi.security.user.credential.cache.duration=24 hours +nifi.security.user.authority.provider=nifi.authorization.FileAuthorizationProvider +nifi.security.support.new.account.requests= +nifi.security.default.user.roles= + +# cluster common properties (cluster manager and nodes must have same values) # +nifi.cluster.protocol.heartbeat.interval=5 sec +nifi.cluster.protocol.is.secure=false +nifi.cluster.protocol.socket.timeout=30 sec +nifi.cluster.protocol.connection.handshake.timeout=45 sec +# if multicast is used, then nifi.cluster.protocol.multicast.xxx properties must be configured # +nifi.cluster.protocol.use.multicast=false +nifi.cluster.protocol.multicast.address= +nifi.cluster.protocol.multicast.port= +nifi.cluster.protocol.multicast.service.broadcast.delay=500 ms +nifi.cluster.protocol.multicast.service.locator.attempts=3 +nifi.cluster.protocol.multicast.service.locator.attempts.delay=1 sec + +# cluster node properties (only configure for cluster nodes) # +nifi.cluster.is.node=false +nifi.cluster.node.address= +nifi.cluster.node.protocol.port= +nifi.cluster.node.protocol.threads=2 +# if multicast is not used, nifi.cluster.node.unicast.xxx must have same values as nifi.cluster.manager.xxx # +nifi.cluster.node.unicast.manager.address= +nifi.cluster.node.unicast.manager.protocol.port= +nifi.cluster.node.unicast.manager.authority.provider.port= + +# cluster manager properties (only configure for cluster manager) # +nifi.cluster.is.manager=false +nifi.cluster.manager.address= +nifi.cluster.manager.protocol.port= +nifi.cluster.manager.authority.provider.port= +nifi.cluster.manager.authority.provider.threads=10 +nifi.cluster.manager.node.firewall.file= +nifi.cluster.manager.node.event.history.size=10 +nifi.cluster.manager.node.api.connection.timeout=30 sec +nifi.cluster.manager.node.api.read.timeout=30 sec +nifi.cluster.manager.node.api.request.threads=10 +nifi.cluster.manager.flow.retrieval.delay=5 sec +nifi.cluster.manager.protocol.threads=10 +nifi.cluster.manager.safemode.duration=0 sec http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java index a600699..4db6bc9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Set; import org.apache.nifi.annotation.lifecycle.OnAdded; +import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerServiceLookup; /** @@ -109,7 +110,7 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * * @param serviceNode the node */ - void unscheduleReferencingComponents(ControllerServiceNode serviceNode); + Set<ConfiguredComponent> unscheduleReferencingComponents(ControllerServiceNode serviceNode); /** * Verifies that all Controller Services referencing the provided Controller @@ -130,7 +131,7 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * * @param serviceNode the node */ - void disableReferencingServices(ControllerServiceNode serviceNode); + Set<ConfiguredComponent> disableReferencingServices(ControllerServiceNode serviceNode); /** * Verifies that all Controller Services referencing the provided @@ -149,8 +150,10 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * Service A and B will both be enabled. * * @param serviceNode the node + * + * @return the set of all components that were updated as a result of this action */ - void enableReferencingServices(ControllerServiceNode serviceNode); + Set<ConfiguredComponent> enableReferencingServices(ControllerServiceNode serviceNode); /** * Verifies that all enabled Processors referencing the ControllerService @@ -172,5 +175,5 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * * @param serviceNode the node */ - void scheduleReferencingComponents(ControllerServiceNode serviceNode); + Set<ConfiguredComponent> scheduleReferencingComponents(ControllerServiceNode serviceNode); } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 8c1cb9b..b888a1b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -83,7 +83,7 @@ import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Size; import org.apache.nifi.connectable.StandardConnection; import org.apache.nifi.controller.cluster.Heartbeater; -import org.apache.nifi.controller.cluster.ZooKeeperHeartbeater; +import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater; import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.exception.ProcessorInstantiationException; @@ -534,7 +534,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (configuredForClustering) { leaderElectionManager = new CuratorLeaderElectionManager(4); - heartbeater = new ZooKeeperHeartbeater(protocolSender, properties); + heartbeater = new ClusterProtocolHeartbeater(protocolSender, properties); } else { leaderElectionManager = null; heartbeater = null; @@ -1273,6 +1273,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private void sendShutdownNotification() { // Generate a heartbeat message and publish it, indicating that we are shutting down final HeartbeatMessage heartbeatMsg = createHeartbeatMessage(); + if (heartbeatMsg == null) { + LOG.warn("Cannot sent Shutdown Notification Message because node's identifier is not known at this time"); + return; + } + final Heartbeat heartbeat = heartbeatMsg.getHeartbeat(); final byte[] hbPayload = heartbeatMsg.getHeartbeat().getPayload(); final NodeConnectionStatus connectionStatus = new NodeConnectionStatus(DisconnectionCode.NODE_SHUTDOWN); @@ -2540,6 +2545,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R status.setProcessingNanos(0); status.setInvocations(0); status.setAverageLineageDuration(0L); + status.setFlowFilesRemoved(0); } else { final int processedCount = entry.getFlowFilesOut(); final long numProcessedBytes = entry.getContentSizeOut(); @@ -2566,6 +2572,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R status.setBytesReceived(entry.getBytesReceived()); status.setFlowFilesSent(entry.getFlowFilesSent()); status.setBytesSent(entry.getBytesSent()); + status.setFlowFilesRemoved(entry.getFlowFilesRemoved()); } // determine the run status and get any validation errors... must check @@ -2835,23 +2842,23 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public void disableReferencingServices(final ControllerServiceNode serviceNode) { - controllerServiceProvider.disableReferencingServices(serviceNode); + public Set<ConfiguredComponent> disableReferencingServices(final ControllerServiceNode serviceNode) { + return controllerServiceProvider.disableReferencingServices(serviceNode); } @Override - public void enableReferencingServices(final ControllerServiceNode serviceNode) { - controllerServiceProvider.enableReferencingServices(serviceNode); + public Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode) { + return controllerServiceProvider.enableReferencingServices(serviceNode); } @Override - public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) { - controllerServiceProvider.scheduleReferencingComponents(serviceNode); + public Set<ConfiguredComponent> scheduleReferencingComponents(final ControllerServiceNode serviceNode) { + return controllerServiceProvider.scheduleReferencingComponents(serviceNode); } @Override - public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { - controllerServiceProvider.unscheduleReferencingComponents(serviceNode); + public Set<ConfiguredComponent> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { + return controllerServiceProvider.unscheduleReferencingComponents(serviceNode); } @Override @@ -3744,7 +3751,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R hbPayload.setTotalFlowFileBytes(queueSize.getByteCount()); // create heartbeat message - final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.getConnectionStatus(), hbPayload.marshal()); + final NodeIdentifier nodeId = getNodeId(); + if (nodeId == null) { + LOG.warn("Cannot create Heartbeat Message because node's identifier is not known at this time"); + return null; + } + + final Heartbeat heartbeat = new Heartbeat(nodeId, bean.isPrimary(), bean.getConnectionStatus(), hbPayload.marshal()); final HeartbeatMessage message = new HeartbeatMessage(); message.setHeartbeat(heartbeat); http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 45999a3..fb72683 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -17,6 +17,8 @@ package org.apache.nifi.controller; import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -203,6 +205,18 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } + @Override + public void archiveFlow() throws IOException { + writeLock.lock(); + try { + final File archiveFile = dao.createArchiveFile(); + try (final OutputStream out = new FileOutputStream(archiveFile)) { + dao.load(out, true); + } + } finally { + writeLock.unlock(); + } + } @Override public void saveFlowChanges() throws IOException { http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java index 9206054..7060699 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java @@ -16,13 +16,15 @@ */ package org.apache.nifi.controller; -import java.util.Collection; import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; import javax.xml.bind.annotation.XmlRootElement; +import org.apache.nifi.web.Revision; + + /** * Represents a data flow snippet. */ @@ -33,14 +35,14 @@ public class StandardSnippet implements Snippet { private String parentGroupId; private Boolean linked; - private final Set<String> processGroups = new HashSet<>(); - private final Set<String> remoteProcessGroups = new HashSet<>(); - private final Set<String> processors = new HashSet<>(); - private final Set<String> inputPorts = new HashSet<>(); - private final Set<String> outputPorts = new HashSet<>(); - private final Set<String> connections = new HashSet<>(); - private final Set<String> labels = new HashSet<>(); - private final Set<String> funnels = new HashSet<>(); + private Map<String, Revision> processGroups = new HashMap<>(); + private Map<String, Revision> remoteProcessGroups = new HashMap<>(); + private Map<String, Revision> processors = new HashMap<>(); + private Map<String, Revision> inputPorts = new HashMap<>(); + private Map<String, Revision> outputPorts = new HashMap<>(); + private Map<String, Revision> connections = new HashMap<>(); + private Map<String, Revision> labels = new HashMap<>(); + private Map<String, Revision> funnels = new HashMap<>(); @Override public String getId() { @@ -74,75 +76,75 @@ public class StandardSnippet implements Snippet { } @Override - public Set<String> getConnections() { - return Collections.unmodifiableSet(connections); + public Map<String, Revision> getConnections() { + return Collections.unmodifiableMap(connections); } - public void addConnections(Collection<String> ids) { - connections.addAll(ids); + public void addConnections(Map<String, Revision> ids) { + connections.putAll(ids); } @Override - public Set<String> getFunnels() { - return Collections.unmodifiableSet(funnels); + public Map<String, Revision> getFunnels() { + return Collections.unmodifiableMap(funnels); } - public void addFunnels(Collection<String> ids) { - funnels.addAll(ids); + public void addFunnels(Map<String, Revision> ids) { + funnels.putAll(ids); } @Override - public Set<String> getInputPorts() { - return Collections.unmodifiableSet(inputPorts); + public Map<String, Revision> getInputPorts() { + return Collections.unmodifiableMap(inputPorts); } - public void addInputPorts(Collection<String> ids) { - inputPorts.addAll(ids); + public void addInputPorts(Map<String, Revision> ids) { + inputPorts.putAll(ids); } @Override - public Set<String> getOutputPorts() { - return Collections.unmodifiableSet(outputPorts); + public Map<String, Revision> getOutputPorts() { + return Collections.unmodifiableMap(outputPorts); } - public void addOutputPorts(Collection<String> ids) { - outputPorts.addAll(ids); + public void addOutputPorts(Map<String, Revision> ids) { + outputPorts.putAll(ids); } @Override - public Set<String> getLabels() { - return Collections.unmodifiableSet(labels); + public Map<String, Revision> getLabels() { + return Collections.unmodifiableMap(labels); } - public void addLabels(Collection<String> ids) { - labels.addAll(ids); + public void addLabels(Map<String, Revision> ids) { + labels.putAll(ids); } @Override - public Set<String> getProcessGroups() { - return Collections.unmodifiableSet(processGroups); + public Map<String, Revision> getProcessGroups() { + return Collections.unmodifiableMap(processGroups); } - public void addProcessGroups(Collection<String> ids) { - processGroups.addAll(ids); + public void addProcessGroups(Map<String, Revision> ids) { + processGroups.putAll(ids); } @Override - public Set<String> getProcessors() { - return Collections.unmodifiableSet(processors); + public Map<String, Revision> getProcessors() { + return Collections.unmodifiableMap(processors); } - public void addProcessors(Collection<String> ids) { - processors.addAll(ids); + public void addProcessors(Map<String, Revision> ids) { + processors.putAll(ids); } @Override - public Set<String> getRemoteProcessGroups() { - return Collections.unmodifiableSet(remoteProcessGroups); + public Map<String, Revision> getRemoteProcessGroups() { + return Collections.unmodifiableMap(remoteProcessGroups); } - public void addRemoteProcessGroups(Collection<String> ids) { - remoteProcessGroups.addAll(ids); + public void addRemoteProcessGroups(Map<String, Revision> ids) { + remoteProcessGroups.putAll(ids); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java new file mode 100644 index 0000000..8e4236b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.cluster; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryForever; +import org.apache.nifi.cluster.protocol.NodeProtocolSender; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses ZooKeeper in order to determine which node is the elected Cluster Coordinator and to indicate + * that this node is part of the cluster. However, once the Cluster Coordinator is known, heartbeats are + * sent directly to the Cluster Coordinator. + */ +public class ClusterProtocolHeartbeater implements Heartbeater { + private static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeater.class); + + private final NodeProtocolSender protocolSender; + private final CuratorFramework curatorClient; + private final String nodesPathPrefix; + + private final String coordinatorPath; + private volatile String coordinatorAddress; + + + public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, final Properties properties) { + this.protocolSender = protocolSender; + + final RetryPolicy retryPolicy = new RetryForever(5000); + final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties); + + curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), + zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); + + curatorClient.start(); + nodesPathPrefix = zkConfig.resolvePath("cluster/nodes"); + coordinatorPath = nodesPathPrefix + "/coordinator"; + } + + private String getHeartbeatAddress() throws IOException { + final String curAddress = coordinatorAddress; + if (curAddress != null) { + return curAddress; + } + + try { + // Get coordinator address and add watcher to change who we are heartbeating to if the value changes. + final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() { + @Override + public void process(final WatchedEvent event) { + coordinatorAddress = null; + } + }).forPath(coordinatorPath); + final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8); + + logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address); + return address; + } catch (Exception e) { + throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e); + } + } + + + @Override + public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException { + final String heartbeatAddress = getHeartbeatAddress(); + + try { + protocolSender.heartbeat(heartbeatMessage, heartbeatAddress); + } catch (final ProtocolException pe) { + // a ProtocolException is likely the result of not being able to communicate + // with the coordinator. If we do get an IOException communicating with the coordinator, + // it will be the cause of the Protocol Exception. In this case, set coordinatorAddress + // to null so that we double-check next time that the coordinator has not changed. + if (pe.getCause() instanceof IOException) { + coordinatorAddress = null; + } + + throw pe; + } + } + + + @Override + public void close() throws IOException { + if (curatorClient != null) { + curatorClient.close(); + } + + logger.info("ZooKeeper heartbeater closed. Will no longer send Heartbeat messages to ZooKeeper"); + } +}
