NIFI-250: Merge develop into NIFI-250
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c28d9f57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c28d9f57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c28d9f57 Branch: refs/heads/NIFI-250 Commit: c28d9f57287b5b95d6451a5365dbc2b3d4552422 Parents: 5a6723c 761e64a Author: Matt Gilman <[email protected]> Authored: Tue Mar 17 08:16:36 2015 -0400 Committer: Matt Gilman <[email protected]> Committed: Tue Mar 17 08:16:36 2015 -0400 ---------------------------------------------------------------------- .../nifi-cluster-protocol/.gitignore | 1 - .../nifi-cluster-protocol/pom.xml | 67 - .../protocol/ClusterManagerProtocolSender.java | 69 - .../cluster/protocol/ConnectionRequest.java | 44 - .../cluster/protocol/ConnectionResponse.java | 141 - .../apache/nifi/cluster/protocol/Heartbeat.java | 68 - .../nifi/cluster/protocol/NodeBulletins.java | 44 - .../nifi/cluster/protocol/NodeIdentifier.java | 172 - .../cluster/protocol/NodeProtocolSender.java | 73 - .../nifi/cluster/protocol/ProtocolContext.java | 39 - .../cluster/protocol/ProtocolException.java | 40 - .../nifi/cluster/protocol/ProtocolHandler.java | 44 - .../nifi/cluster/protocol/ProtocolListener.java | 72 - .../protocol/ProtocolMessageMarshaller.java | 38 - .../protocol/ProtocolMessageUnmarshaller.java | 38 - .../nifi/cluster/protocol/StandardDataFlow.java | 105 - .../UnknownServiceAddressException.java | 39 - .../impl/ClusterManagerProtocolSenderImpl.java | 245 - .../ClusterManagerProtocolSenderListener.java | 118 - .../protocol/impl/ClusterServiceDiscovery.java | 181 - .../protocol/impl/ClusterServiceLocator.java | 229 - .../impl/ClusterServicesBroadcaster.java | 182 - .../protocol/impl/CopyingInputStream.java | 77 - .../impl/MulticastProtocolListener.java | 204 - .../protocol/impl/NodeProtocolSenderImpl.java | 171 - .../impl/NodeProtocolSenderListener.java | 115 - .../protocol/impl/SocketProtocolListener.java | 205 - .../protocol/jaxb/JaxbProtocolContext.java | 148 - .../jaxb/message/AdaptedConnectionRequest.java | 40 - .../jaxb/message/AdaptedConnectionResponse.java | 109 - .../protocol/jaxb/message/AdaptedCounter.java | 56 - .../protocol/jaxb/message/AdaptedDataFlow.java | 64 - .../protocol/jaxb/message/AdaptedHeartbeat.java | 66 - .../jaxb/message/AdaptedNodeBulletins.java | 50 - .../jaxb/message/AdaptedNodeIdentifier.java | 76 - .../jaxb/message/ConnectionRequestAdapter.java | 41 - .../jaxb/message/ConnectionResponseAdapter.java | 55 - .../protocol/jaxb/message/DataFlowAdapter.java | 50 - .../protocol/jaxb/message/HeartbeatAdapter.java | 54 - .../jaxb/message/JaxbProtocolUtils.java | 42 - .../jaxb/message/NodeBulletinsAdapter.java | 48 - .../jaxb/message/NodeIdentifierAdapter.java | 51 - .../protocol/jaxb/message/ObjectFactory.java | 104 - .../message/ConnectionRequestMessage.java | 46 - .../message/ConnectionResponseMessage.java | 66 - .../ControllerStartupFailureMessage.java | 49 - .../protocol/message/DisconnectMessage.java | 55 - .../protocol/message/ExceptionMessage.java | 44 - .../protocol/message/FlowRequestMessage.java | 46 - .../protocol/message/FlowResponseMessage.java | 44 - .../protocol/message/HeartbeatMessage.java | 43 - .../message/MulticastProtocolMessage.java | 66 - .../protocol/message/NodeBulletinsMessage.java | 43 - .../cluster/protocol/message/PingMessage.java | 55 - .../message/PrimaryRoleAssignmentMessage.java | 56 - .../protocol/message/ProtocolMessage.java | 61 - .../message/ReconnectionFailureMessage.java | 45 - .../message/ReconnectionRequestMessage.java | 94 - .../message/ReconnectionResponseMessage.java | 32 - .../message/ServiceBroadcastMessage.java | 64 - .../MulticastConfigurationFactoryBean.java | 60 - .../ServerSocketConfigurationFactoryBean.java | 65 - .../spring/SocketConfigurationFactoryBean.java | 66 - .../resources/nifi-cluster-protocol-context.xml | 110 - .../ClusterManagerProtocolSenderImplTest.java | 134 - .../impl/ClusterServiceDiscoveryTest.java | 135 - .../impl/ClusterServiceLocatorTest.java | 121 - .../impl/ClusterServicesBroadcasterTest.java | 133 - .../impl/MulticastProtocolListenerTest.java | 171 - .../impl/NodeProtocolSenderImplTest.java | 203 - .../impl/testutils/DelayedProtocolHandler.java | 57 - .../testutils/ReflexiveProtocolHandler.java | 47 - .../nifi-framework/nifi-cluster-web/.gitignore | 1 - .../nifi-framework/nifi-cluster-web/pom.xml | 44 - .../nifi/cluster/context/ClusterContext.java | 59 - .../cluster/context/ClusterContextImpl.java | 69 - .../context/ClusterContextThreadLocal.java | 42 - .../nifi-framework/nifi-cluster/.gitignore | 1 - .../nifi-framework/nifi-cluster/pom.xml | 134 - .../cluster/client/MulticastTestClient.java | 151 - .../org/apache/nifi/cluster/event/Event.java | 122 - .../apache/nifi/cluster/event/EventManager.java | 65 - .../cluster/event/impl/EventManagerImpl.java | 143 - .../cluster/firewall/ClusterNodeFirewall.java | 35 - .../impl/FileBasedClusterNodeFirewall.java | 207 - .../nifi/cluster/flow/ClusterDataFlow.java | 56 - .../apache/nifi/cluster/flow/DaoException.java | 40 - .../apache/nifi/cluster/flow/DataFlowDao.java | 62 - .../cluster/flow/DataFlowManagementService.java | 132 - .../nifi/cluster/flow/PersistedFlowState.java | 37 - .../nifi/cluster/flow/StaleFlowException.java | 42 - .../nifi/cluster/flow/impl/DataFlowDaoImpl.java | 615 --- .../impl/DataFlowManagementServiceImpl.java | 413 -- .../nifi/cluster/manager/ClusterManager.java | 225 - .../cluster/manager/HttpClusterManager.java | 169 - .../cluster/manager/HttpRequestReplicator.java | 99 - .../cluster/manager/HttpResponseMapper.java | 42 - .../nifi/cluster/manager/NodeResponse.java | 329 -- .../exception/BlockedByFirewallException.java | 60 - .../manager/exception/ClusterException.java | 40 - .../ConnectingNodeMutableRequestException.java | 41 - ...DisconnectedNodeMutableRequestException.java | 41 - .../exception/IllegalClusterStateException.java | 41 - .../exception/IllegalNodeDeletionException.java | 41 - .../IllegalNodeDisconnectionException.java | 42 - .../IllegalNodeReconnectionException.java | 41 - .../IneligiblePrimaryNodeException.java | 41 - .../exception/MutableRequestException.java | 42 - .../exception/NoConnectedNodesException.java | 41 - .../exception/NoResponseFromNodesException.java | 42 - .../exception/NodeDisconnectionException.java | 41 - .../exception/NodeReconnectionException.java | 40 - .../PrimaryRoleAssignmentException.java | 41 - .../SafeModeMutableRequestException.java | 41 - .../manager/exception/UnknownNodeException.java | 41 - .../exception/UriConstructionException.java | 42 - .../manager/impl/ClusteredEventAccess.java | 135 - .../manager/impl/ClusteredReportingContext.java | 165 - .../manager/impl/HttpRequestReplicatorImpl.java | 531 --- .../manager/impl/HttpResponseMapperImpl.java | 85 - .../cluster/manager/impl/WebClusterManager.java | 4187 ------------------ .../java/org/apache/nifi/cluster/node/Node.java | 252 -- ...anagerProtocolServiceLocatorFactoryBean.java | 116 - ...FileBasedClusterNodeFirewallFactoryBean.java | 58 - .../spring/WebClusterManagerFactoryBean.java | 141 - .../reporting/ClusteredReportingTaskNode.java | 49 - .../resources/nifi-cluster-manager-context.xml | 128 - .../event/impl/EventManagerImplTest.java | 119 - .../impl/FileBasedClusterNodeFirewallTest.java | 98 - .../impl/DataFlowManagementServiceImplTest.java | 343 -- .../impl/HttpRequestReplicatorImplTest.java | 368 -- .../impl/HttpResponseMapperImplTest.java | 126 - .../manager/impl/TestWebClusterManager.java | 54 - .../cluster/manager/testutils/HttpRequest.java | 239 - .../cluster/manager/testutils/HttpResponse.java | 93 - .../manager/testutils/HttpResponseAction.java | 60 - .../cluster/manager/testutils/HttpServer.java | 240 - .../ClusterManagerProtocolSenderImplTest.java | 133 - .../impl/ClusterServiceLocatorTest.java | 119 - .../impl/ClusterServicesBroadcasterTest.java | 131 - .../impl/MulticastProtocolListenerTest.java | 171 - .../impl/NodeProtocolSenderImplTest.java | 201 - .../impl/SocketProtocolListenerTest.java | 132 - .../testutils/DelayedProtocolHandler.java | 57 - .../testutils/ReflexiveProtocolHandler.java | 47 - .../src/test/resources/logback-test.xml | 48 - .../apache/nifi/cluster/firewall/impl/empty.txt | 0 .../apache/nifi/cluster/firewall/impl/ips.txt | 12 - .../nifi-framework-cluster-protocol/.gitignore | 1 + .../nifi-framework-cluster-protocol/pom.xml | 67 + .../protocol/ClusterManagerProtocolSender.java | 69 + .../cluster/protocol/ConnectionRequest.java | 44 + .../cluster/protocol/ConnectionResponse.java | 141 + .../apache/nifi/cluster/protocol/Heartbeat.java | 68 + .../nifi/cluster/protocol/NodeBulletins.java | 44 + .../nifi/cluster/protocol/NodeIdentifier.java | 172 + .../cluster/protocol/NodeProtocolSender.java | 73 + .../nifi/cluster/protocol/ProtocolContext.java | 39 + .../cluster/protocol/ProtocolException.java | 40 + .../nifi/cluster/protocol/ProtocolHandler.java | 44 + .../nifi/cluster/protocol/ProtocolListener.java | 72 + .../protocol/ProtocolMessageMarshaller.java | 38 + .../protocol/ProtocolMessageUnmarshaller.java | 38 + .../nifi/cluster/protocol/StandardDataFlow.java | 105 + .../UnknownServiceAddressException.java | 39 + .../impl/ClusterManagerProtocolSenderImpl.java | 245 + .../ClusterManagerProtocolSenderListener.java | 118 + .../protocol/impl/ClusterServiceDiscovery.java | 181 + .../protocol/impl/ClusterServiceLocator.java | 229 + .../impl/ClusterServicesBroadcaster.java | 182 + .../protocol/impl/CopyingInputStream.java | 77 + .../impl/MulticastProtocolListener.java | 204 + .../protocol/impl/NodeProtocolSenderImpl.java | 171 + .../impl/NodeProtocolSenderListener.java | 115 + .../protocol/impl/SocketProtocolListener.java | 205 + .../protocol/jaxb/JaxbProtocolContext.java | 148 + .../jaxb/message/AdaptedConnectionRequest.java | 40 + .../jaxb/message/AdaptedConnectionResponse.java | 109 + .../protocol/jaxb/message/AdaptedCounter.java | 56 + .../protocol/jaxb/message/AdaptedDataFlow.java | 64 + .../protocol/jaxb/message/AdaptedHeartbeat.java | 66 + .../jaxb/message/AdaptedNodeBulletins.java | 50 + .../jaxb/message/AdaptedNodeIdentifier.java | 76 + .../jaxb/message/ConnectionRequestAdapter.java | 41 + .../jaxb/message/ConnectionResponseAdapter.java | 55 + .../protocol/jaxb/message/DataFlowAdapter.java | 50 + .../protocol/jaxb/message/HeartbeatAdapter.java | 54 + .../jaxb/message/JaxbProtocolUtils.java | 42 + .../jaxb/message/NodeBulletinsAdapter.java | 48 + .../jaxb/message/NodeIdentifierAdapter.java | 51 + .../protocol/jaxb/message/ObjectFactory.java | 104 + .../message/ConnectionRequestMessage.java | 46 + .../message/ConnectionResponseMessage.java | 66 + .../ControllerStartupFailureMessage.java | 49 + .../protocol/message/DisconnectMessage.java | 55 + .../protocol/message/ExceptionMessage.java | 44 + .../protocol/message/FlowRequestMessage.java | 46 + .../protocol/message/FlowResponseMessage.java | 44 + .../protocol/message/HeartbeatMessage.java | 43 + .../message/MulticastProtocolMessage.java | 66 + .../protocol/message/NodeBulletinsMessage.java | 43 + .../cluster/protocol/message/PingMessage.java | 55 + .../message/PrimaryRoleAssignmentMessage.java | 56 + .../protocol/message/ProtocolMessage.java | 61 + .../message/ReconnectionFailureMessage.java | 45 + .../message/ReconnectionRequestMessage.java | 94 + .../message/ReconnectionResponseMessage.java | 32 + .../message/ServiceBroadcastMessage.java | 64 + .../MulticastConfigurationFactoryBean.java | 60 + .../ServerSocketConfigurationFactoryBean.java | 65 + .../spring/SocketConfigurationFactoryBean.java | 66 + .../resources/nifi-cluster-protocol-context.xml | 110 + .../ClusterManagerProtocolSenderImplTest.java | 134 + .../impl/ClusterServiceDiscoveryTest.java | 135 + .../impl/ClusterServiceLocatorTest.java | 121 + .../impl/ClusterServicesBroadcasterTest.java | 133 + .../impl/MulticastProtocolListenerTest.java | 171 + .../impl/NodeProtocolSenderImplTest.java | 203 + .../impl/testutils/DelayedProtocolHandler.java | 57 + .../testutils/ReflexiveProtocolHandler.java | 47 + .../nifi-framework-cluster-web/.gitignore | 1 + .../nifi-framework-cluster-web/pom.xml | 44 + .../nifi/cluster/context/ClusterContext.java | 59 + .../cluster/context/ClusterContextImpl.java | 69 + .../context/ClusterContextThreadLocal.java | 42 + .../ClusterAwareOptimisticLockingManager.java | 96 + .../nifi-framework-cluster/.gitignore | 1 + .../nifi-framework-cluster/pom.xml | 148 + .../cluster/client/MulticastTestClient.java | 151 + .../org/apache/nifi/cluster/event/Event.java | 122 + .../apache/nifi/cluster/event/EventManager.java | 65 + .../cluster/event/impl/EventManagerImpl.java | 143 + .../cluster/firewall/ClusterNodeFirewall.java | 35 + .../impl/FileBasedClusterNodeFirewall.java | 207 + .../nifi/cluster/flow/ClusterDataFlow.java | 56 + .../apache/nifi/cluster/flow/DaoException.java | 40 + .../apache/nifi/cluster/flow/DataFlowDao.java | 62 + .../cluster/flow/DataFlowManagementService.java | 132 + .../nifi/cluster/flow/PersistedFlowState.java | 37 + .../nifi/cluster/flow/StaleFlowException.java | 42 + .../nifi/cluster/flow/impl/DataFlowDaoImpl.java | 615 +++ .../impl/DataFlowManagementServiceImpl.java | 413 ++ .../nifi/cluster/manager/ClusterManager.java | 225 + .../cluster/manager/HttpClusterManager.java | 169 + .../cluster/manager/HttpRequestReplicator.java | 99 + .../cluster/manager/HttpResponseMapper.java | 42 + .../nifi/cluster/manager/NodeResponse.java | 329 ++ .../exception/BlockedByFirewallException.java | 60 + .../manager/exception/ClusterException.java | 40 + .../ConnectingNodeMutableRequestException.java | 41 + ...DisconnectedNodeMutableRequestException.java | 41 + .../exception/IllegalClusterStateException.java | 41 + .../exception/IllegalNodeDeletionException.java | 41 + .../IllegalNodeDisconnectionException.java | 42 + .../IllegalNodeReconnectionException.java | 41 + .../IneligiblePrimaryNodeException.java | 41 + .../exception/MutableRequestException.java | 42 + .../exception/NoConnectedNodesException.java | 41 + .../exception/NoResponseFromNodesException.java | 42 + .../exception/NodeDisconnectionException.java | 41 + .../exception/NodeReconnectionException.java | 40 + .../PrimaryRoleAssignmentException.java | 41 + .../SafeModeMutableRequestException.java | 41 + .../manager/exception/UnknownNodeException.java | 41 + .../exception/UriConstructionException.java | 42 + .../manager/impl/ClusteredEventAccess.java | 135 + .../manager/impl/ClusteredReportingContext.java | 165 + .../manager/impl/HttpRequestReplicatorImpl.java | 531 +++ .../manager/impl/HttpResponseMapperImpl.java | 85 + .../cluster/manager/impl/WebClusterManager.java | 4187 ++++++++++++++++++ .../java/org/apache/nifi/cluster/node/Node.java | 252 ++ ...anagerProtocolServiceLocatorFactoryBean.java | 116 + ...FileBasedClusterNodeFirewallFactoryBean.java | 58 + .../spring/WebClusterManagerFactoryBean.java | 141 + .../reporting/ClusteredReportingTaskNode.java | 49 + .../resources/nifi-cluster-manager-context.xml | 128 + .../event/impl/EventManagerImplTest.java | 119 + .../impl/FileBasedClusterNodeFirewallTest.java | 98 + .../impl/DataFlowManagementServiceImplTest.java | 343 ++ .../impl/HttpRequestReplicatorImplTest.java | 368 ++ .../impl/HttpResponseMapperImplTest.java | 126 + .../manager/impl/TestWebClusterManager.java | 54 + .../cluster/manager/testutils/HttpRequest.java | 239 + .../cluster/manager/testutils/HttpResponse.java | 93 + .../manager/testutils/HttpResponseAction.java | 60 + .../cluster/manager/testutils/HttpServer.java | 240 + .../ClusterManagerProtocolSenderImplTest.java | 133 + .../impl/ClusterServiceLocatorTest.java | 119 + .../impl/ClusterServicesBroadcasterTest.java | 131 + .../impl/MulticastProtocolListenerTest.java | 171 + .../impl/NodeProtocolSenderImplTest.java | 201 + .../impl/SocketProtocolListenerTest.java | 132 + .../testutils/DelayedProtocolHandler.java | 57 + .../testutils/ReflexiveProtocolHandler.java | 47 + .../src/test/resources/logback-test.xml | 48 + .../apache/nifi/cluster/firewall/impl/empty.txt | 0 .../apache/nifi/cluster/firewall/impl/ips.txt | 12 + .../nifi-framework/nifi-framework-core/pom.xml | 19 +- .../nifi-framework/nifi-web/nifi-web-ui/pom.xml | 13 + .../nifi-framework/pom.xml | 6 +- .../nifi-standard-processors/pom.xml | 86 +- nifi/pom.xml | 36 +- 302 files changed, 19060 insertions(+), 18844 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml index 0000000,18e4ba4..e08a9bf mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml @@@ -1,0 -1,48 +1,44 @@@ + <?xml version="1.0" encoding="UTF-8"?> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework</artifactId> + <version>0.0.3-incubating-SNAPSHOT</version> + </parent> + <artifactId>nifi-framework-cluster-web</artifactId> + <packaging>jar</packaging> + <description>The clustering software for communicating with the NiFi web api.</description> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> - <artifactId>nifi-web-optimistic-locking</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> + <artifactId>nifi-administration</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-user-actions</artifactId> + </dependency> + </dependencies> + </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java index 0000000,012e7c7..c8c7206 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java @@@ -1,0 -1,47 +1,42 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.cluster.context; + + /** + * Manages a cluster context on a threadlocal. + */ + public class ClusterContextThreadLocal { + + private static final ThreadLocal<ClusterContext> contextHolder = new ThreadLocal<>(); + + public static void removeContext() { + contextHolder.remove(); + } + + public static ClusterContext createEmptyContext() { + return new ClusterContextImpl(); + } + + public static ClusterContext getContext() { - ClusterContext ctx = contextHolder.get(); - if(ctx == null) { - ctx = createEmptyContext(); - contextHolder.set(ctx); - } - return ctx; ++ return contextHolder.get(); + } + + public static void setContext(final ClusterContext context) { + contextHolder.set(context); + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml index 0000000,ef927a3..d069d72 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml @@@ -1,0 -1,144 +1,148 @@@ + <?xml version="1.0" encoding="UTF-8"?> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework</artifactId> + <version>0.0.3-incubating-SNAPSHOT</version> + </parent> + <artifactId>nifi-framework-cluster</artifactId> + <packaging>jar</packaging> + <description>The clustering software for NiFi.</description> + <dependencies> + <!-- application core dependencies --> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-logging-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-client-dto</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> ++ <artifactId>nifi-web-optimistic-locking</artifactId> ++ </dependency> ++ <dependency> ++ <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-core-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-cluster-protocol</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-cluster-web</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-web-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-administration</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-site-to-site</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-site-to-site-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </dependency> + <!-- third party dependencies --> + + <!-- sun dependencies --> + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + </dependency> + + <!-- commons dependencies --> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>commons-net</groupId> + <artifactId>commons-net</artifactId> + </dependency> + + <!-- jersey dependencies --> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </dependency> + + <!-- spring dependencies --> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt</exclude> + <exclude>src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java index 0000000,eedb88f..c17b429 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java @@@ -1,0 -1,45 +1,56 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.cluster.flow; + + import org.apache.nifi.cluster.protocol.NodeIdentifier; + import org.apache.nifi.cluster.protocol.StandardDataFlow; + + /** + * A dataflow with additional information about the cluster. + * + * @author unattributed + */ + public class ClusterDataFlow { + + private final StandardDataFlow dataFlow; - + private final NodeIdentifier primaryNodeId; ++ private final byte[] controllerServices; ++ private final byte[] reportingTasks; + - public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId) { ++ public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId, final byte[] controllerServices, final byte[] reportingTasks) { + this.dataFlow = dataFlow; + this.primaryNodeId = primaryNodeId; ++ this.controllerServices = controllerServices; ++ this.reportingTasks = reportingTasks; + } + ++ public byte[] getControllerServices() { ++ return controllerServices; ++ } ++ ++ public byte[] getReportingTasks() { ++ return reportingTasks; ++ } ++ + public NodeIdentifier getPrimaryNodeId() { + return primaryNodeId; + } + + public StandardDataFlow getDataFlow() { + return dataFlow; + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java index 0000000,339d904..082d65e mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java @@@ -1,0 -1,115 +1,132 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.cluster.flow; + + import java.util.Set; ++ + import org.apache.nifi.cluster.protocol.NodeIdentifier; + + /** + * A service for managing the cluster's flow. The service will attempt to keep + * the cluster's dataflow current while respecting the value of the configured + * retrieval delay. + * + * The eligible retrieval time is reset with the configured delay every time the + * flow state is set to STALE. If the state is set to UNKNOWN or CURRENT, then + * the flow will not be retrieved. + * + * Clients must call start() and stop() to initialize and stop the instance. + * + * @author unattributed + */ + public interface DataFlowManagementService { + + /** + * Starts the instance. Start may only be called if the instance is not + * running. + */ + void start(); + + /** + * Stops the instance. Stop may only be called if the instance is running. + */ + void stop(); + + /** + * @return true if the instance is started; false otherwise. + */ + boolean isRunning(); + + /** + * Loads the dataflow. + * + * @return the dataflow or null if no dataflow exists + */ + ClusterDataFlow loadDataFlow(); + + /** + * Updates the dataflow with the given primary node identifier. + * + * @param nodeId the node identifier + * + * @throws DaoException if the update failed + */ + void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException; + + /** ++ * Updates the dataflow with the given serialized form of the Controller Services that are to exist on the NCM. ++ * ++ * @param serializedControllerServices ++ * @throws DaoException ++ */ ++ void updateControllerServices(byte[] serializedControllerServices) throws DaoException; ++ ++ /** ++ * Updates the dataflow with the given serialized form of Reporting Tasks that are to exist on the NCM. ++ * ++ * @param serviceNodes ++ * @throws DaoException ++ */ ++ void updateReportingTasks(byte[] serializedReportingTasks) throws DaoException; ++ ++ /** + * Sets the state of the flow. + * + * @param flowState the state + * + * @see PersistedFlowState + */ + void setPersistedFlowState(PersistedFlowState flowState); + + /** + * @return the state of the flow + */ + PersistedFlowState getPersistedFlowState(); + + /** + * @return true if the flow is current; false otherwise. + */ + boolean isFlowCurrent(); + + /** + * Sets the node identifiers to use when attempting to retrieve the flow. + * + * @param nodeIds the node identifiers + */ + void setNodeIds(Set<NodeIdentifier> nodeIds); + + /** + * Returns the set of node identifiers the service is using to retrieve the + * flow. + * + * @return the set of node identifiers the service is using to retrieve the + * flow. + */ + Set<NodeIdentifier> getNodeIds(); + + /** + * @return the retrieval delay in seconds + */ + int getRetrievalDelaySeconds(); + + /** + * Sets the retrieval delay. + * + * @param delay the retrieval delay in seconds + */ + void setRetrievalDelay(String delay); + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java index 0000000,72b594a..dd9d2a3 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java @@@ -1,0 -1,600 +1,615 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.cluster.flow.impl; + + import java.io.ByteArrayOutputStream; + import java.io.File; + import java.io.FileInputStream; + import java.io.FileOutputStream; + import java.io.FilenameFilter; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.security.MessageDigest; + import java.security.NoSuchAlgorithmException; + import java.util.Arrays; + import java.util.UUID; + + import javax.xml.bind.JAXBContext; + import javax.xml.bind.JAXBException; + import javax.xml.bind.Marshaller; + import javax.xml.bind.Unmarshaller; + import javax.xml.bind.annotation.XmlRootElement; + import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + import javax.xml.parsers.DocumentBuilder; + import javax.xml.parsers.DocumentBuilderFactory; + import javax.xml.transform.OutputKeys; + import javax.xml.transform.Transformer; + import javax.xml.transform.TransformerFactory; + import javax.xml.transform.dom.DOMSource; + import javax.xml.transform.stream.StreamResult; + + import org.apache.commons.compress.archivers.ArchiveEntry; + import org.apache.commons.compress.archivers.tar.TarArchiveEntry; + import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; + import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; + import org.apache.nifi.cluster.flow.ClusterDataFlow; + import org.apache.nifi.cluster.flow.DaoException; + import org.apache.nifi.cluster.flow.DataFlowDao; + import org.apache.nifi.cluster.flow.PersistedFlowState; + import org.apache.nifi.cluster.protocol.DataFlow; + import org.apache.nifi.cluster.protocol.NodeIdentifier; + import org.apache.nifi.cluster.protocol.StandardDataFlow; + import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter; + import org.apache.nifi.logging.NiFiLog; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.stream.io.BufferedOutputStream; + import org.apache.nifi.stream.io.ByteArrayInputStream; + import org.apache.nifi.stream.io.StreamUtils; + import org.apache.nifi.util.file.FileUtils; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.w3c.dom.Document; + import org.w3c.dom.Element; + + /** + * Implements the FlowDao interface. The implementation tracks the state of the + * dataflow by annotating the filename of the flow state file. Specifically, the + * implementation correlates PersistedFlowState states to filename extensions. + * The correlation is as follows: + * <ul> + * <li> CURRENT maps to flow.xml </li> + * <li> STALE maps to flow.xml.stale </li> + * <li> UNKNOWN maps to flow.xml.unknown </li> + * </ul> + * Whenever the flow state changes, the flow state file's name is updated to + * denote its state. + * + * The implementation also provides for a restore directory that may be + * configured for higher availability. At instance creation, if the primary or + * restore directories have multiple flow state files, an exception is thrown. + * If the primary directory has a current flow state file, but the restore + * directory does not, then the primary flow state file is copied to the restore + * directory. If the restore directory has a current flow state file, but the + * primary directory does not, then the restore flow state file is copied to the + * primary directory. If both the primary and restore directories have a current + * flow state file and the files are different, then an exception is thrown. + * + * When the flow state file is saved, it is always saved first to the restore + * directory followed by a save to the primary directory. When the flow state + * file is loaded, a check is made to verify that the primary and restore flow + * state files are both current. If either is not current, then an exception is + * thrown. The primary flow state file is always read when the load method is + * called. + * + * @author unattributed + */ + public class DataFlowDaoImpl implements DataFlowDao { + + private final File primaryDirectory; + private final File restoreDirectory; + private final boolean autoStart; + private final String generatedRootGroupId = UUID.randomUUID().toString(); + + public static final String STALE_EXT = ".stale"; + public static final String UNKNOWN_EXT = ".unknown"; + public static final String FLOW_PACKAGE = "flow.tar"; + public static final String FLOW_XML_FILENAME = "flow.xml"; + public static final String TEMPLATES_FILENAME = "templates.xml"; + public static final String SNIPPETS_FILENAME = "snippets.xml"; ++ public static final String CONTROLLER_SERVICES_FILENAME = "controller-services.xml"; ++ public static final String REPORTING_TASKS_FILENAME = "reporting-tasks.xml"; + public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml"; + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class)); + + public DataFlowDaoImpl(final File primaryDirectory) throws DaoException { + this(primaryDirectory, null, false); + } + + public DataFlowDaoImpl(final File primaryDirectory, final File restoreDirectory, final boolean autoStart) throws DaoException { + + // sanity check that primary directory is a directory, creating it if necessary + if (primaryDirectory == null) { + throw new IllegalArgumentException("Primary directory may not be null."); + } else if (!primaryDirectory.exists()) { + if (!primaryDirectory.mkdir()) { + throw new DaoException(String.format("Failed to create primary directory '%s'", primaryDirectory.getAbsolutePath())); + } + } else if (!primaryDirectory.isDirectory()) { + throw new IllegalArgumentException("Primary directory must be a directory."); + } + + this.autoStart = autoStart; + + try { + this.primaryDirectory = primaryDirectory; + this.restoreDirectory = restoreDirectory; + + if (restoreDirectory == null) { + // check that we have exactly one current flow state file + ensureSingleCurrentStateFile(primaryDirectory); + } else { + + // check that restore directory is a directory, creating it if necessary + FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory); + + // check that restore directory is not the same as the primary directory + if (primaryDirectory.getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) { + throw new IllegalArgumentException(String.format("Primary directory '%s' is the same as restore directory '%s' ", + primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath())); + } + + final File[] primaryFlowStateFiles = getFlowStateFiles(primaryDirectory); + final File[] restoreFlowStateFiles = getFlowStateFiles(restoreDirectory); + + // if more than one state file in either primary or restore, then throw exception + if (primaryFlowStateFiles.length > 1) { + throw new IllegalStateException(String.format("Found multiple dataflow state files in primary directory '%s'", primaryDirectory)); + } else if (restoreFlowStateFiles.length > 1) { + throw new IllegalStateException(String.format("Found multiple dataflow state files in restore directory '%s'", restoreDirectory)); + } + + // check that the single primary state file we found is current or create a new one + final File primaryFlowStateFile = ensureSingleCurrentStateFile(primaryDirectory); + + // check that the single restore state file we found is current or create a new one + final File restoreFlowStateFile = ensureSingleCurrentStateFile(restoreDirectory); + + // if there was a difference in flow state file directories, then copy the appropriate files + if (restoreFlowStateFiles.length == 0 && primaryFlowStateFiles.length != 0) { + // copy primary state file to restore + FileUtils.copyFile(primaryFlowStateFile, restoreFlowStateFile, false, false, logger); + } else if (primaryFlowStateFiles.length == 0 && restoreFlowStateFiles.length != 0) { + // copy restore state file to primary + FileUtils.copyFile(restoreFlowStateFile, primaryFlowStateFile, false, false, logger); + } else { + // sync the primary copy with the restore copy + syncWithRestore(primaryFlowStateFile, restoreFlowStateFile); + } + + } + } catch (final IOException | IllegalArgumentException | IllegalStateException | JAXBException ex) { + throw new DaoException(ex); + } + } + + + private void syncWithRestore(final File primaryFile, final File restoreFile) throws IOException { + try (final FileInputStream primaryFis = new FileInputStream(primaryFile); + final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis); + final FileInputStream restoreFis = new FileInputStream(restoreFile); + final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) { + + final ArchiveEntry primaryEntry = primaryIn.getNextEntry(); + final ArchiveEntry restoreEntry = restoreIn.getNextEntry(); + + if ( primaryEntry == null && restoreEntry == null ) { + return; + } + + if ( (primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null) ) { + throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", + primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); + } + + final byte[] primaryMd5 = calculateMd5(primaryIn); + final byte[] restoreMd5 = calculateMd5(restoreIn); + + if ( !Arrays.equals(primaryMd5, restoreMd5) ) { + throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", + primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); + } + } + } + + private byte[] calculateMd5(final InputStream in) throws IOException { + final MessageDigest digest; + try { + digest = MessageDigest.getInstance("MD5"); + } catch (final NoSuchAlgorithmException nsae) { + throw new IOException(nsae); + } + + int len; + final byte[] buffer = new byte[8192]; + while ((len = in.read(buffer)) > -1) { + if (len > 0) { + digest.update(buffer, 0, len); + } + } + return digest.digest(); + } + + @Override + public ClusterDataFlow loadDataFlow() throws DaoException { + try { + return parseDataFlow(getExistingFlowStateFile(primaryDirectory)); + } catch (final IOException | JAXBException ex) { + throw new DaoException(ex); + } + } + + @Override + public void saveDataFlow(final ClusterDataFlow dataFlow) throws DaoException { + try { + + final File primaryStateFile = getFlowStateFile(primaryDirectory); + + // write to restore before writing to primary in case primary experiences problems + if (restoreDirectory != null) { + final File restoreStateFile = getFlowStateFile(restoreDirectory); + if (restoreStateFile == null) { + if (primaryStateFile == null) { + writeDataFlow(createNewFlowStateFile(restoreDirectory), dataFlow); + } else { + throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory '%s' exists, but it does not exist in the restore directory '%s'", + primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath())); + } + } else { + if (primaryStateFile == null) { + throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory '%s' exists, but it does not exist in the primary directory '%s'", + restoreDirectory.getAbsolutePath(), primaryDirectory.getAbsolutePath())); + } else { + final PersistedFlowState primaryFlowState = getPersistedFlowState(primaryStateFile); + final PersistedFlowState restoreFlowState = getPersistedFlowState(restoreStateFile); + if (primaryFlowState == restoreFlowState) { + writeDataFlow(restoreStateFile, dataFlow); + } else { + throw new DaoException(String.format("Unable to save dataflow because state file in primary directory '%s' has state '%s', but the state file in the restore directory '%s' has state '%s'", + primaryDirectory.getAbsolutePath(), primaryFlowState, restoreDirectory.getAbsolutePath(), restoreFlowState)); + } + } + } + } + + // write dataflow to primary + if (primaryStateFile == null) { + writeDataFlow(createNewFlowStateFile(primaryDirectory), dataFlow); + } else { + writeDataFlow(primaryStateFile, dataFlow); + } + + } catch (final IOException | JAXBException ex) { + throw new DaoException(ex); + } + } + + @Override + public PersistedFlowState getPersistedFlowState() { + // trust restore over primary if configured for restore + if (restoreDirectory == null) { + return getPersistedFlowState(getExistingFlowStateFile(primaryDirectory)); + } else { + return getPersistedFlowState(getExistingFlowStateFile(restoreDirectory)); + } + } + + @Override + public void setPersistedFlowState(final PersistedFlowState flowState) throws DaoException { + // rename restore before primary if configured for restore + if (restoreDirectory != null) { + renameFlowStateFile(getExistingFlowStateFile(restoreDirectory), flowState); + } + renameFlowStateFile(getExistingFlowStateFile(primaryDirectory), flowState); + } + + private File ensureSingleCurrentStateFile(final File dir) throws IOException, JAXBException { + + // ensure that we have at most one state file and if we have one, it is current + final File[] directoryFlowStateFiles = getFlowStateFiles(dir); + if (directoryFlowStateFiles.length > 1) { + throw new DaoException(String.format("Found multiple dataflow state files in directory '%s'", dir)); + } else if (directoryFlowStateFiles.length == 0) { + // create a new file if none exist + return createNewFlowStateFile(dir); + } else { + // check that the single flow state file is current + final PersistedFlowState flowState = getPersistedFlowState(directoryFlowStateFiles[0]); + if (PersistedFlowState.CURRENT == flowState) { + return directoryFlowStateFiles[0]; + } else { + throw new DaoException(String.format("Dataflow state file '%s' must be current.", directoryFlowStateFiles[0].getAbsolutePath())); + } + } + + } + + private PersistedFlowState getPersistedFlowState(final File file) { + final String path = file.getAbsolutePath(); + if (path.endsWith(STALE_EXT)) { + return PersistedFlowState.STALE; + } else if (path.endsWith(UNKNOWN_EXT)) { + return PersistedFlowState.UNKNOWN; + } else { + return PersistedFlowState.CURRENT; + } + } + + private File getFlowStateFile(final File dir) { + final File[] files = getFlowStateFiles(dir); + if (files.length > 1) { + throw new IllegalStateException(String.format("Expected at most one dataflow state file, but found %s files.", files.length)); + } else if (files.length == 0) { + return null; + } else { + return files[0]; + } + } + + private File getExistingFlowStateFile(final File dir) { + final File file = getFlowStateFile(dir); + if (file == null) { + throw new IllegalStateException(String.format("Expected a dataflow state file, but none existed in directory '%s'", dir.getAbsolutePath())); + } + return file; + } + + private File[] getFlowStateFiles(final File dir) { + final File[] files = dir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return (name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) || name.endsWith(UNKNOWN_EXT)); + } + }); + + if (files == null) { + return new File[0]; + } else { + return files; + } + } + + private File removeStateFileExtension(final File file) { + + final String path = file.getAbsolutePath(); + final int stateFileExtIndex; + if (path.endsWith(STALE_EXT)) { + stateFileExtIndex = path.lastIndexOf(STALE_EXT); + } else if (path.endsWith(UNKNOWN_EXT)) { + stateFileExtIndex = path.lastIndexOf(UNKNOWN_EXT); + } else { + stateFileExtIndex = path.length(); + } + + return new File(path.substring(0, stateFileExtIndex)); + } + + private File addStateFileExtension(final File file, final PersistedFlowState state) { + switch (state) { + case CURRENT: { + return file; + } + case STALE: { + return new File(file.getAbsolutePath() + STALE_EXT); + } + case UNKNOWN: { + return new File(file.getAbsolutePath() + UNKNOWN_EXT); + } + default: { + throw new RuntimeException("Unsupported PersistedFlowState Enum value: " + state); + } + } + } + + private File createNewFlowStateFile(final File dir) throws IOException, JAXBException { + final File stateFile = new File(dir, FLOW_PACKAGE); + stateFile.createNewFile(); + - final byte[] flowBytes = getEmptyFlowBytes(); - final byte[] templateBytes = new byte[0]; - final byte[] snippetBytes = new byte[0]; - final DataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes); - - final ClusterMetadata clusterMetadata = new ClusterMetadata(); - writeDataFlow(stateFile, dataFlow, clusterMetadata); ++ writeDataFlow(stateFile, new ClusterDataFlow(null, null, new byte[0], new byte[0]), new ClusterMetadata()); + + return stateFile; + } + + private byte[] getEmptyFlowBytes() throws IOException { + try { + final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + final Document document = docBuilder.newDocument(); + + final Element controller = document.createElement("flowController"); + document.appendChild(controller); + + controller.appendChild(createTextElement(document, "maxThreadCount", "15")); + + final Element rootGroup = document.createElement("rootGroup"); + rootGroup.appendChild(createTextElement(document, "id", generatedRootGroupId)); + rootGroup.appendChild(createTextElement(document, "name", "NiFi Flow")); + + // create the position element + final Element positionElement = createTextElement(document, "position", ""); + positionElement.setAttribute("x", "0.0"); + positionElement.setAttribute("y", "0.0"); + rootGroup.appendChild(positionElement); + + rootGroup.appendChild(createTextElement(document, "comment", "")); + controller.appendChild(rootGroup); + + final Transformer transformer = TransformerFactory.newInstance().newTransformer(); + transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2"); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + + final DOMSource source = new DOMSource(document); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final StreamResult result = new StreamResult(baos); + transformer.transform(source, result); + + return baos.toByteArray(); + } catch (final Exception e) { + throw new IOException(e); + } + } + + private Element createTextElement(final Document document, final String elementName, final String value) { + final Element element = document.createElement(elementName); + element.setTextContent(value); + return element; + } + + private void renameFlowStateFile(final File flowStateFile, final PersistedFlowState newState) throws DaoException { + final PersistedFlowState existingState = getPersistedFlowState(flowStateFile); + if (existingState != newState) { + final File newFlowStateFile = addStateFileExtension(removeStateFileExtension(flowStateFile), newState); + if (flowStateFile.renameTo(newFlowStateFile) == false) { + throw new DaoException( + String.format("Failed to rename flow state file '%s' to new name '%s'", flowStateFile.getAbsolutePath(), newFlowStateFile.getAbsolutePath())); + } + } + } + + private ClusterDataFlow parseDataFlow(final File file) throws IOException, JAXBException, DaoException { + byte[] flowBytes = new byte[0]; + byte[] templateBytes = new byte[0]; + byte[] snippetBytes = new byte[0]; + byte[] clusterInfoBytes = new byte[0]; - ++ byte[] controllerServiceBytes = new byte[0]; ++ byte[] reportingTaskBytes = new byte[0]; ++ + try (final InputStream inStream = new FileInputStream(file); + final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) { + TarArchiveEntry tarEntry; + while ((tarEntry = tarIn.getNextTarEntry()) != null) { + switch (tarEntry.getName()) { + case FLOW_XML_FILENAME: + flowBytes = new byte[(int) tarEntry.getSize()]; + StreamUtils.fillBuffer(tarIn, flowBytes, true); + break; + case TEMPLATES_FILENAME: + templateBytes = new byte[(int) tarEntry.getSize()]; + StreamUtils.fillBuffer(tarIn, templateBytes, true); + break; + case SNIPPETS_FILENAME: + snippetBytes = new byte[(int) tarEntry.getSize()]; + StreamUtils.fillBuffer(tarIn, snippetBytes, true); + break; + case CLUSTER_INFO_FILENAME: + clusterInfoBytes = new byte[(int) tarEntry.getSize()]; + StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true); + break; ++ case CONTROLLER_SERVICES_FILENAME: ++ controllerServiceBytes = new byte[(int) tarEntry.getSize()]; ++ StreamUtils.fillBuffer(tarIn, controllerServiceBytes, true); ++ break; ++ case REPORTING_TASKS_FILENAME: ++ reportingTaskBytes = new byte[(int) tarEntry.getSize()]; ++ StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true); ++ break; + default: + throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName()); + } + } + } + + final ClusterMetadata clusterMetadata; + if (clusterInfoBytes.length == 0) { + clusterMetadata = null; + } else { + final Unmarshaller clusterMetadataUnmarshaller = ClusterMetadata.jaxbCtx.createUnmarshaller(); + clusterMetadata = (ClusterMetadata) clusterMetadataUnmarshaller.unmarshal(new ByteArrayInputStream(clusterInfoBytes)); + } + + final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes); + dataFlow.setAutoStartProcessors(autoStart); + - return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId()); ++ return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes); + } + + private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException { + + // get the data flow + DataFlow dataFlow = clusterDataFlow.getDataFlow(); + + // if no dataflow, then write a new dataflow + if (dataFlow == null) { + dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]); + } + + // setup the cluster metadata + final ClusterMetadata clusterMetadata = new ClusterMetadata(); + clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId()); + + // write to disk - writeDataFlow(file, dataFlow, clusterMetadata); ++ writeDataFlow(file, clusterDataFlow, clusterMetadata); + } + + private void writeTarEntry(final TarArchiveOutputStream tarOut, final String filename, final byte[] bytes) throws IOException { + final TarArchiveEntry flowEntry = new TarArchiveEntry(filename); + flowEntry.setSize(bytes.length); + tarOut.putArchiveEntry(flowEntry); + tarOut.write(bytes); + tarOut.closeArchiveEntry(); + } + - private void writeDataFlow(final File file, final DataFlow dataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException { ++ private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException { + + try (final OutputStream fos = new FileOutputStream(file); + final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) { + - writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow()); - writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates()); - writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets()); ++ final DataFlow dataFlow = clusterDataFlow.getDataFlow(); ++ if ( dataFlow == null ) { ++ writeTarEntry(tarOut, FLOW_XML_FILENAME, getEmptyFlowBytes()); ++ writeTarEntry(tarOut, TEMPLATES_FILENAME, new byte[0]); ++ writeTarEntry(tarOut, SNIPPETS_FILENAME, new byte[0]); ++ } else { ++ writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow()); ++ writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates()); ++ writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets()); ++ } ++ writeTarEntry(tarOut, CONTROLLER_SERVICES_FILENAME, clusterDataFlow.getControllerServices()); ++ writeTarEntry(tarOut, REPORTING_TASKS_FILENAME, clusterDataFlow.getReportingTasks()); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(256); + writeClusterMetadata(clusterMetadata, baos); + final byte[] clusterInfoBytes = baos.toByteArray(); + + writeTarEntry(tarOut, CLUSTER_INFO_FILENAME, clusterInfoBytes); + } + } + + private void writeClusterMetadata(final ClusterMetadata clusterMetadata, final OutputStream os) throws IOException, JAXBException { + // write cluster metadata to output stream + final Marshaller marshaller = ClusterMetadata.jaxbCtx.createMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); + marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true); + marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8"); + marshaller.marshal(clusterMetadata, os); + } + + @XmlRootElement(name = "clusterMetadata") + private static class ClusterMetadata { + + private NodeIdentifier primaryNodeId; + + private static final JAXBContext jaxbCtx; + + static { + try { + jaxbCtx = JAXBContext.newInstance(ClusterMetadata.class); + } catch (final JAXBException je) { + throw new RuntimeException(je); + } + } + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getPrimaryNodeId() { + return primaryNodeId; + } + + public void setPrimaryNodeId(final NodeIdentifier primaryNodeId) { + this.primaryNodeId = primaryNodeId; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java index 0000000,e135af3..1bb8ca3 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java @@@ -1,0 -1,356 +1,413 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.cluster.flow.impl; + + import java.util.Collections; + import java.util.Date; + import java.util.Set; + import java.util.Timer; + import java.util.TimerTask; + import java.util.concurrent.CopyOnWriteArraySet; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantLock; + + import org.apache.nifi.cluster.flow.ClusterDataFlow; + import org.apache.nifi.cluster.flow.DaoException; + import org.apache.nifi.cluster.flow.DataFlowDao; + import org.apache.nifi.cluster.flow.DataFlowManagementService; + import org.apache.nifi.cluster.flow.PersistedFlowState; + import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; + import org.apache.nifi.cluster.protocol.NodeIdentifier; + import org.apache.nifi.cluster.protocol.StandardDataFlow; + import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; + import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; + import org.apache.nifi.logging.NiFiLog; + import org.apache.nifi.util.FormatUtils; - + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * Implements FlowManagementService interface. The service tries to keep the + * cluster's flow current with regards to the available nodes. + * + * The instance may be configured with a retrieval delay, which will reduce the + * number of retrievals performed by the service at the expense of increasing + * the chances that the service will not be able to provide a current flow to + * the caller. + * + * By default, the service will try to update the flow as quickly as possible. + * Configuring a delay enables a less aggressive retrieval strategy. + * Specifically, the eligible retrieval time is reset every time the flow state + * is set to STALE. If the state is set to UNKNOWN or CURRENT, then the flow + * will not be retrieved. + * + * @author unattributed + */ + public class DataFlowManagementServiceImpl implements DataFlowManagementService { + + /* + * Developer Note: + * + * This class maintains an ExecutorService and a Runnable. + * Although the class is not externally threadsafe, its internals are protected to + * accommodate multithread access between the ExecutorServer and the Runnable. + * + */ + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class)); + + private final DataFlowDao flowDao; + + private final ClusterManagerProtocolSender sender; + + private final Set<NodeIdentifier> nodeIds = new CopyOnWriteArraySet<>(); + + private final AtomicBoolean stopRequested = new AtomicBoolean(false); + + private final AtomicLong lastRetrievalTime = new AtomicLong(-1); + + private Timer flowRetriever; + + private long retrievableAfterTime = 0L; + + private AtomicInteger retrievalDelaySeconds = new AtomicInteger(0); + + private final TimingReentrantLock resourceLock = new TimingReentrantLock(new ReentrantLock()); + + public DataFlowManagementServiceImpl(final DataFlowDao flowDao, final ClusterManagerProtocolSender sender) { + if (flowDao == null) { + throw new IllegalArgumentException("Flow DAO may not be null."); + } else if (sender == null) { + throw new IllegalArgumentException("Cluster Manager Protocol Sender may not be null."); + } + this.flowDao = flowDao; + this.sender = sender; + } + + @Override + public void start() { + + if (isRunning()) { + throw new IllegalArgumentException("Instance is already running."); + } + + // reset stop requested + stopRequested.set(false); + + // setup flow retreiver timer + flowRetriever = new Timer("Flow Management Service", /* is daemon */ true); + flowRetriever.schedule(new FlowRetrieverTimerTask(), 0, 500); + } + + @Override + public boolean isRunning() { + return (flowRetriever != null); + } + + @Override + public void stop() { + + if (isRunning() == false) { + throw new IllegalArgumentException("Instance is already stopped."); + } + + // record stop request + stopRequested.set(true); + + flowRetriever.cancel(); + flowRetriever = null; + + } + + @Override + public ClusterDataFlow loadDataFlow() throws DaoException { + resourceLock.lock(); + try { + return flowDao.loadDataFlow(); + } finally { + resourceLock.unlock("loadDataFlow"); + } + } + + @Override + public void updatePrimaryNode(final NodeIdentifier nodeId) { + resourceLock.lock(); + try { + final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); + + final StandardDataFlow dataFlow; ++ final byte[] controllerServiceBytes; ++ final byte[] reportingTaskBytes; + if (existingClusterDataFlow == null) { + dataFlow = null; ++ controllerServiceBytes = new byte[0]; ++ reportingTaskBytes = new byte[0]; + } else { + dataFlow = existingClusterDataFlow.getDataFlow(); ++ controllerServiceBytes = existingClusterDataFlow.getControllerServices(); ++ reportingTaskBytes = existingClusterDataFlow.getReportingTasks(); + } + - flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId)); ++ flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes)); + } finally { + resourceLock.unlock("updatePrimaryNode"); + } + } ++ ++ ++ @Override ++ public void updateControllerServices(final byte[] controllerServiceBytes) throws DaoException { ++ resourceLock.lock(); ++ try { ++ final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); ++ ++ final StandardDataFlow dataFlow; ++ final byte[] reportingTaskBytes; ++ final NodeIdentifier nodeId; ++ if (existingClusterDataFlow == null) { ++ dataFlow = null; ++ nodeId = null; ++ reportingTaskBytes = new byte[0]; ++ } else { ++ dataFlow = existingClusterDataFlow.getDataFlow(); ++ nodeId = existingClusterDataFlow.getPrimaryNodeId(); ++ reportingTaskBytes = existingClusterDataFlow.getReportingTasks(); ++ } ++ ++ flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes)); ++ } finally { ++ resourceLock.unlock("updateControllerServices"); ++ } ++ } ++ ++ @Override ++ public void updateReportingTasks(final byte[] reportingTaskBytes) throws DaoException { ++ resourceLock.lock(); ++ try { ++ final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); ++ ++ final StandardDataFlow dataFlow; ++ final byte[] controllerServiceBytes; ++ final NodeIdentifier nodeId; ++ if (existingClusterDataFlow == null) { ++ dataFlow = null; ++ nodeId = null; ++ controllerServiceBytes = null; ++ } else { ++ dataFlow = existingClusterDataFlow.getDataFlow(); ++ nodeId = existingClusterDataFlow.getPrimaryNodeId(); ++ controllerServiceBytes = existingClusterDataFlow.getControllerServices(); ++ } ++ ++ flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes)); ++ } finally { ++ resourceLock.unlock("updateControllerServices"); ++ } ++ } + + @Override + public PersistedFlowState getPersistedFlowState() { + resourceLock.lock(); + try { + return flowDao.getPersistedFlowState(); + } finally { + resourceLock.unlock("getPersistedFlowState"); + } + } + + @Override + public boolean isFlowCurrent() { + return PersistedFlowState.CURRENT == getPersistedFlowState(); + } + + @Override + public void setPersistedFlowState(final PersistedFlowState flowState) { + // lock to ensure state change and retrievable time update are atomic + resourceLock.lock(); + try { + flowDao.setPersistedFlowState(flowState); + if (PersistedFlowState.STALE == flowState) { + retrievableAfterTime = new Date().getTime() + (getRetrievalDelaySeconds() * 1000); + } else if (PersistedFlowState.UNKNOWN == flowState || PersistedFlowState.CURRENT == flowState) { + retrievableAfterTime = Long.MAX_VALUE; + } + } finally { + resourceLock.unlock("setPersistedFlowState"); + } + } + + @Override + public Set<NodeIdentifier> getNodeIds() { + return Collections.unmodifiableSet(nodeIds); + } + + @Override + public void setNodeIds(final Set<NodeIdentifier> nodeIds) { + + if (nodeIds == null) { + throw new IllegalArgumentException("Node IDs may not be null."); + } + + resourceLock.lock(); + try { + + if (this.nodeIds.equals(nodeIds)) { + return; + } + + this.nodeIds.clear(); + this.nodeIds.addAll(nodeIds); + + } finally { + resourceLock.unlock("setNodeIds"); + } + + } + + @Override + public int getRetrievalDelaySeconds() { + return retrievalDelaySeconds.get(); + } + + @Override + public void setRetrievalDelay(final String retrievalDelay) { + this.retrievalDelaySeconds.set((int) FormatUtils.getTimeDuration(retrievalDelay, TimeUnit.SECONDS)); + } + + public ClusterManagerProtocolSender getSender() { + return sender; + } + + public long getLastRetrievalTime() { + return lastRetrievalTime.get(); + } + + /** + * A timer task for issuing FlowRequestMessage messages to nodes to retrieve + * an updated flow. + */ + private class FlowRetrieverTimerTask extends TimerTask { + + @Override + public void run() { + + resourceLock.lock(); + try { + // if flow is current, then we're done + if (isFlowCurrent()) { + return; + } + } catch (final Exception ex) { + logger.info("Encountered exception checking if flow is current caused by " + ex, ex); + } finally { + resourceLock.unlock("FlowRetrieverTimerTask - isFlowCurrent"); + } + + final FlowRequestMessage request = new FlowRequestMessage(); + for (final NodeIdentifier nodeId : getNodeIds()) { + try { + // setup request + request.setNodeId(nodeId); + + // record request time + final long requestSentTime = new Date().getTime(); + + resourceLock.lock(); + try { + // sanity checks before making request + if (stopRequested.get()) { // did we receive a stop request + logger.debug("Stopping runnable prematurely because a request to stop was issued."); + return; + } else if (requestSentTime < retrievableAfterTime) { + /* + * Retrievable after time was updated while obtaining + * the lock, so try again later + */ + return; + } + } finally { + resourceLock.unlock("FlowRetrieverTimerTask - check stopRequested"); + } + + // send request + final FlowResponseMessage response = sender.requestFlow(request); + + resourceLock.lock(); + try { + // check if the retrieved flow is still valid + if (requestSentTime > retrievableAfterTime) { + logger.info("Saving retrieved flow."); + + final StandardDataFlow dataFlow = response.getDataFlow(); + final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); + final ClusterDataFlow currentClusterDataFlow; + if (existingClusterDataFlow == null) { - currentClusterDataFlow = new ClusterDataFlow(dataFlow, null); ++ currentClusterDataFlow = new ClusterDataFlow(dataFlow, null, new byte[0], new byte[0]); + } else { - currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId()); ++ currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(), ++ existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks()); + } + flowDao.saveDataFlow(currentClusterDataFlow); + flowDao.setPersistedFlowState(PersistedFlowState.CURRENT); + lastRetrievalTime.set(new Date().getTime()); + } + + /* + * Retrievable after time was updated while requesting + * the flow, so try again later. + */ + } finally { + resourceLock.unlock("FlowRetrieverTimerTask - saveDataFlow"); + } + + } catch (final Throwable t) { + logger.info("Encountered exception retrieving flow from node " + nodeId + " caused by " + t, t); + } + } + } + } + + private static class TimingReentrantLock { + + private final Lock lock; + private static final Logger logger = LoggerFactory.getLogger("dataFlowManagementService.lock"); + + private final ThreadLocal<Long> lockTime = new ThreadLocal<>(); + + public TimingReentrantLock(final Lock lock) { + this.lock = lock; + } + + public void lock() { + lock.lock(); + lockTime.set(System.nanoTime()); + } + + public void unlock(final String task) { + final long nanosLocked = System.nanoTime() - lockTime.get(); + lock.unlock(); + + final long millisLocked = TimeUnit.MILLISECONDS.convert(nanosLocked, TimeUnit.NANOSECONDS); + if (millisLocked > 100L) { + logger.debug("Lock held for {} milliseconds for task: {}", millisLocked, task); + } + } + } + }
