http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index 1378d3b..fb06a15 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -16,24 +16,6 @@ */ package org.apache.nifi.cluster.coordination.node; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - import org.apache.nifi.cluster.coordination.flow.FlowElection; import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; import org.apache.nifi.cluster.protocol.ConnectionRequest; @@ -47,8 +29,12 @@ import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.events.EventReporter; import org.apache.nifi.services.FlowService; +import org.apache.nifi.state.MockStateMap; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.revision.RevisionManager; import org.junit.Assert; @@ -58,11 +44,33 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; + public class TestNodeClusterCoordinator { private NodeClusterCoordinator coordinator; private ClusterCoordinationProtocolSenderListener senderListener; private List<NodeConnectionStatus> nodeStatuses; + private StateManagerProvider stateManagerProvider; private NiFiProperties createProperties() { final Map<String,String> addProps = new HashMap<>(); @@ -76,12 +84,18 @@ public class TestNodeClusterCoordinator { senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class); nodeStatuses = Collections.synchronizedList(new ArrayList<>()); + stateManagerProvider = Mockito.mock(StateManagerProvider.class); + + final StateManager stateManager = Mockito.mock(StateManager.class); + when(stateManager.getState(any(Scope.class))).thenReturn(new MockStateMap(Collections.emptyMap(), 1)); + when(stateManagerProvider.getStateManager(anyString())).thenReturn(stateManager); + final EventReporter eventReporter = Mockito.mock(EventReporter.class); final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); - Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); + when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null) { + coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null, stateManagerProvider) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { nodeStatuses.add(updatedStatus); @@ -90,7 +104,7 @@ public class TestNodeClusterCoordinator { final FlowService flowService = Mockito.mock(FlowService.class); final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50], new HashSet<>()); - Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow); + when(flowService.createDataFlow()).thenReturn(dataFlow); coordinator.setFlowService(flowService); } @@ -130,14 +144,14 @@ public class TestNodeClusterCoordinator { } @Test - public void testTryAgainIfNoFlowServiceSet() { + public void testTryAgainIfNoFlowServiceSet() throws IOException { final ClusterCoordinationProtocolSenderListener senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class); final EventReporter eventReporter = Mockito.mock(EventReporter.class); final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); - Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); + when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), - null, revisionManager, createProperties(), null) { + null, revisionManager, createProperties(), null, stateManagerProvider) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { } @@ -150,7 +164,7 @@ public class TestNodeClusterCoordinator { coordinator.setConnected(true); - final ProtocolMessage protocolResponse = coordinator.handle(requestMsg); + final ProtocolMessage protocolResponse = coordinator.handle(requestMsg, Collections.emptySet()); assertNotNull(protocolResponse); assertTrue(protocolResponse instanceof ConnectionResponseMessage); @@ -164,7 +178,7 @@ public class TestNodeClusterCoordinator { final ClusterCoordinationProtocolSenderListener senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class); final AtomicReference<ReconnectionRequestMessage> requestRef = new AtomicReference<>(); - Mockito.when(senderListener.requestReconnection(Mockito.any(ReconnectionRequestMessage.class))).thenAnswer(new Answer<Object>() { + when(senderListener.requestReconnection(any(ReconnectionRequestMessage.class))).thenAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { final ReconnectionRequestMessage msg = invocation.getArgumentAt(0, ReconnectionRequestMessage.class); @@ -175,10 +189,10 @@ public class TestNodeClusterCoordinator { final EventReporter eventReporter = Mockito.mock(EventReporter.class); final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); - Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); + when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), - null, revisionManager, createProperties(), null) { + null, revisionManager, createProperties(), null, stateManagerProvider) { @Override void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) { } @@ -186,7 +200,7 @@ public class TestNodeClusterCoordinator { final FlowService flowService = Mockito.mock(FlowService.class); final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50], new HashSet<>()); - Mockito.when(flowService.createDataFlowFromController()).thenReturn(dataFlow); + when(flowService.createDataFlowFromController()).thenReturn(dataFlow); coordinator.setFlowService(flowService); coordinator.setConnected(true); @@ -232,7 +246,7 @@ public class TestNodeClusterCoordinator { @Test(timeout = 5000) public void testStatusChangesReplicated() throws InterruptedException, IOException { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); - Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); + when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); // Create a connection request message and send to the coordinator final NodeIdentifier requestedNodeId = createNodeId(1); @@ -397,7 +411,7 @@ public class TestNodeClusterCoordinator { final NodeStatusChangeMessage msg = new NodeStatusChangeMessage(); msg.setNodeId(nodeId1); msg.setNodeConnectionStatus(oldStatus); - coordinator.handle(msg); + coordinator.handle(msg, Collections.emptySet()); // Ensure that no status change message was send Thread.sleep(1000); @@ -413,7 +427,7 @@ public class TestNodeClusterCoordinator { final ConnectionRequestMessage crm = new ConnectionRequestMessage(); crm.setConnectionRequest(connectionRequest); - final ProtocolMessage response = coordinator.handle(crm); + final ProtocolMessage response = coordinator.handle(crm, Collections.emptySet()); assertNotNull(response); assertTrue(response instanceof ConnectionResponseMessage); final ConnectionResponseMessage responseMessage = (ConnectionResponseMessage) response; @@ -424,7 +438,7 @@ public class TestNodeClusterCoordinator { final ConnectionRequestMessage crm2 = new ConnectionRequestMessage(); crm2.setConnectionRequest(conRequest2); - final ProtocolMessage conflictingResponse = coordinator.handle(crm2); + final ProtocolMessage conflictingResponse = coordinator.handle(crm2, Collections.emptySet()); assertNotNull(conflictingResponse); assertTrue(conflictingResponse instanceof ConnectionResponseMessage); final ConnectionResponseMessage conflictingResponseMessage = (ConnectionResponseMessage) conflictingResponse; @@ -446,7 +460,7 @@ public class TestNodeClusterCoordinator { final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>())); final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage(); requestMsg.setConnectionRequest(request); - return coordinator.handle(requestMsg); + return coordinator.handle(requestMsg, Collections.emptySet()); }
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java index 45a2e42..3980865 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java @@ -219,7 +219,7 @@ public class ClusterConnectionIT { cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS); final Node coordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS); - final NodeIdentifier node4NotReallyInCluster = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9283, "localhost", 9284, "localhost", 9285, null, false, null); + final NodeIdentifier node4NotReallyInCluster = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9283, "localhost", 9284, "localhost", 9286, "localhost", 9285, null, false, null); final Map<NodeIdentifier, NodeConnectionStatus> replacementStatuses = new HashMap<>(); replacementStatuses.put(node1.getIdentifier(), new NodeConnectionStatus(node1.getIdentifier(), DisconnectionCode.USER_DISCONNECTED)); http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java index e0d8a97..3133736 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java @@ -17,17 +17,6 @@ package org.apache.nifi.cluster.integration; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.bundle.Bundle; @@ -73,6 +62,18 @@ import org.apache.nifi.web.revision.RevisionManager; import org.junit.Assert; import org.mockito.Mockito; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + public class Node { private final NodeIdentifier nodeId; private final NiFiProperties nodeProperties; @@ -133,7 +134,7 @@ public class Node { private static NodeIdentifier createNodeId() { - return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null); + return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null); } /** @@ -296,8 +297,13 @@ public class Node { } final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener); - return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null, - revisionManager, nodeProperties, protocolSender); + try { + return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null, + revisionManager, nodeProperties, protocolSender); + } catch (IOException e) { + Assert.fail(e.toString()); + return null; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy index ffa3429..83d301b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy @@ -63,6 +63,6 @@ class ConnectionEntityMergerSpec extends Specification { } def createNodeIdentifier(int id) { - new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null) + new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, null, id * 10, 'stsaddr', id * 100, id * 1000, false, null) } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy index bb1d595..3997bec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy @@ -147,6 +147,6 @@ class ControllerServiceEntityMergerSpec extends Specification { } def createNodeIdentifier(int id) { - new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null) + new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, null, id * 10, 'stsaddr', id * 100, id * 1000, false, null) } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy index 028c864..0a485b2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy @@ -55,6 +55,6 @@ class LabelEntityMergerSpec extends Specification { } def createNodeIdentifier(int id) { - new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null) + new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, null, id * 10, 'stsaddr', id * 100, id * 1000, false, null) } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java deleted file mode 100644 index 6f55e79..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller; - -import org.apache.nifi.controller.queue.DropFlowFileState; -import org.apache.nifi.controller.queue.DropFlowFileStatus; -import org.apache.nifi.controller.queue.QueueSize; - -public class DropFlowFileRequest implements DropFlowFileStatus { - private final String identifier; - private final long submissionTime = System.currentTimeMillis(); - - private volatile QueueSize originalSize; - private volatile QueueSize currentSize; - private volatile QueueSize droppedSize = new QueueSize(0, 0L); - private volatile long lastUpdated = System.currentTimeMillis(); - private volatile String failureReason; - - private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK; - - - public DropFlowFileRequest(final String identifier) { - this.identifier = identifier; - } - - @Override - public String getRequestIdentifier() { - return identifier; - } - - @Override - public long getRequestSubmissionTime() { - return submissionTime; - } - - @Override - public QueueSize getOriginalSize() { - return originalSize; - } - - void setOriginalSize(final QueueSize originalSize) { - this.originalSize = originalSize; - } - - @Override - public QueueSize getCurrentSize() { - return currentSize; - } - - void setCurrentSize(final QueueSize queueSize) { - this.currentSize = queueSize; - } - - @Override - public QueueSize getDroppedSize() { - return droppedSize; - } - - void setDroppedSize(final QueueSize droppedSize) { - this.droppedSize = droppedSize; - } - - @Override - public synchronized DropFlowFileState getState() { - return state; - } - - @Override - public long getLastUpdated() { - return lastUpdated; - } - - @Override - public String getFailureReason() { - return failureReason; - } - - synchronized void setState(final DropFlowFileState state) { - setState(state, null); - } - - synchronized void setState(final DropFlowFileState state, final String explanation) { - this.state = state; - this.failureReason = explanation; - this.lastUpdated = System.currentTimeMillis(); - } - - synchronized boolean cancel() { - if (this.state == DropFlowFileState.COMPLETE || this.state == DropFlowFileState.CANCELED) { - return false; - } - - this.state = DropFlowFileState.CANCELED; - return true; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java new file mode 100644 index 0000000..69a0b92 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public class DropFlowFileRequest implements DropFlowFileStatus { + private final String identifier; + private final long submissionTime = System.currentTimeMillis(); + + private volatile QueueSize originalSize; + private volatile QueueSize currentSize; + private volatile QueueSize droppedSize = new QueueSize(0, 0L); + private volatile long lastUpdated = System.currentTimeMillis(); + private volatile String failureReason; + + private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK; + + + public DropFlowFileRequest(final String identifier) { + this.identifier = identifier; + } + + @Override + public String getRequestIdentifier() { + return identifier; + } + + @Override + public long getRequestSubmissionTime() { + return submissionTime; + } + + @Override + public QueueSize getOriginalSize() { + return originalSize; + } + + public void setOriginalSize(final QueueSize originalSize) { + this.originalSize = originalSize; + } + + @Override + public QueueSize getCurrentSize() { + return currentSize; + } + + public void setCurrentSize(final QueueSize queueSize) { + this.currentSize = queueSize; + } + + @Override + public QueueSize getDroppedSize() { + return droppedSize; + } + + public void setDroppedSize(final QueueSize droppedSize) { + this.droppedSize = droppedSize; + } + + @Override + public synchronized DropFlowFileState getState() { + return state; + } + + @Override + public long getLastUpdated() { + return lastUpdated; + } + + @Override + public String getFailureReason() { + return failureReason; + } + + public synchronized void setState(final DropFlowFileState state) { + setState(state, null); + } + + public synchronized void setState(final DropFlowFileState state, final String explanation) { + this.state = state; + this.failureReason = explanation; + this.lastUpdated = System.currentTimeMillis(); + } + + public synchronized boolean cancel() { + if (this.state == DropFlowFileState.COMPLETE || this.state == DropFlowFileState.CANCELED) { + return false; + } + + this.state = DropFlowFileState.CANCELED; + return true; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java index 5aeb5c5..b63be53 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java @@ -18,6 +18,8 @@ package org.apache.nifi.controller.repository; import org.apache.nifi.controller.repository.claim.ContentClaim; +import java.util.Optional; + /** * */ @@ -25,23 +27,37 @@ public class ContentNotFoundException extends RuntimeException { private static final long serialVersionUID = 19048239082L; private final transient ContentClaim claim; + private final transient FlowFileRecord flowFile; public ContentNotFoundException(final ContentClaim claim) { super("Could not find content for " + claim); this.claim = claim; + this.flowFile = null; } public ContentNotFoundException(final ContentClaim claim, final Throwable t) { super("Could not find content for " + claim, t); this.claim = claim; + this.flowFile = null; } public ContentNotFoundException(final ContentClaim claim, final String message) { super("Could not find content for " + claim + ": " + message); this.claim = claim; + this.flowFile = null; + } + + public ContentNotFoundException(final FlowFileRecord flowFile, final ContentClaim claim, final String message) { + super("Could not find content for " + claim + ": " + message); + this.claim = claim; + this.flowFile = flowFile; } public ContentClaim getMissingClaim() { return claim; } + + public Optional<FlowFileRecord> getFlowFile() { + return Optional.ofNullable(flowFile); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 6172874..fe60585 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -29,18 +29,16 @@ import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.controller.ProcessScheduler; -import org.apache.nifi.controller.StandardFlowFileQueue; +import org.apache.nifi.controller.queue.ConnectionEventListener; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileQueueFactory; +import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.controller.repository.FlowFileRepository; -import org.apache.nifi.controller.repository.FlowFileSwapManager; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.events.EventReporter; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.scheduling.SchedulingStrategy; import java.util.ArrayList; import java.util.Collection; @@ -60,7 +58,7 @@ import java.util.stream.Collectors; * one or more relationships that map the source component to the destination * component. */ -public final class StandardConnection implements Connection { +public final class StandardConnection implements Connection, ConnectionEventListener { private final String id; private final AtomicReference<ProcessGroup> processGroup; @@ -69,13 +67,16 @@ public final class StandardConnection implements Connection { private final Connectable source; private final AtomicReference<Connectable> destination; private final AtomicReference<Collection<Relationship>> relationships; - private final StandardFlowFileQueue flowFileQueue; private final AtomicInteger labelIndex = new AtomicInteger(1); private final AtomicLong zIndex = new AtomicLong(0L); private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); private final ProcessScheduler scheduler; + private final FlowFileQueueFactory flowFileQueueFactory; + private final boolean clustered; private final int hashCode; + private volatile FlowFileQueue flowFileQueue; + private StandardConnection(final Builder builder) { id = builder.id; name = new AtomicReference<>(builder.name); @@ -85,9 +86,10 @@ public final class StandardConnection implements Connection { destination = new AtomicReference<>(builder.destination); relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); scheduler = builder.scheduler; - flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager, - scheduler, builder.swapManager, builder.eventReporter, builder.queueSwapThreshold, - builder.defaultBackPressureObjectThreshold, builder.defaultBackPressureDataSizeThreshold); + flowFileQueueFactory = builder.flowFileQueueFactory; + clustered = builder.clustered; + + flowFileQueue = flowFileQueueFactory.createFlowFileQueue(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null, this); hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); } @@ -148,6 +150,20 @@ public final class StandardConnection implements Connection { } @Override + public void triggerDestinationEvent() { + if (getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { + scheduler.registerEvent(getDestination()); + } + } + + @Override + public void triggerSourceEvent() { + if (getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { + scheduler.registerEvent(getSource()); + } + } + + @Override public Authorizable getSourceAuthorizable() { final Connectable sourceConnectable = getSource(); final Authorizable sourceAuthorizable; @@ -297,7 +313,7 @@ public final class StandardConnection implements Connection { throw new IllegalStateException("Cannot change destination of Connection because the current destination is running"); } - if (getFlowFileQueue().getUnacknowledgedQueueSize().getObjectCount() > 0) { + if (getFlowFileQueue().isUnacknowledgedFlowFile()) { throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination); } @@ -354,7 +370,7 @@ public final class StandardConnection implements Connection { @Override public String toString() { - return "Connection[Source ID=" + id + ",Dest ID=" + getDestination().getIdentifier() + "]"; + return "Connection[ID=" + getIdentifier() + ", Source ID=" + getSource().getIdentifier() + ", Dest ID=" + getDestination().getIdentifier() + "]"; } /** @@ -386,14 +402,8 @@ public final class StandardConnection implements Connection { private Connectable source; private Connectable destination; private Collection<Relationship> relationships; - private FlowFileSwapManager swapManager; - private EventReporter eventReporter; - private FlowFileRepository flowFileRepository; - private ProvenanceEventRepository provenanceRepository; - private ResourceClaimManager resourceClaimManager; - private int queueSwapThreshold; - private Long defaultBackPressureObjectThreshold; - private String defaultBackPressureDataSizeThreshold; + private FlowFileQueueFactory flowFileQueueFactory; + private boolean clustered = false; public Builder(final ProcessScheduler scheduler) { this.scheduler = scheduler; @@ -440,43 +450,13 @@ public final class StandardConnection implements Connection { return this; } - public Builder swapManager(final FlowFileSwapManager swapManager) { - this.swapManager = swapManager; - return this; - } - - public Builder eventReporter(final EventReporter eventReporter) { - this.eventReporter = eventReporter; - return this; - } - - public Builder flowFileRepository(final FlowFileRepository flowFileRepository) { - this.flowFileRepository = flowFileRepository; + public Builder flowFileQueueFactory(final FlowFileQueueFactory flowFileQueueFactory) { + this.flowFileQueueFactory = flowFileQueueFactory; return this; } - public Builder provenanceRepository(final ProvenanceEventRepository provenanceRepository) { - this.provenanceRepository = provenanceRepository; - return this; - } - - public Builder resourceClaimManager(final ResourceClaimManager resourceClaimManager) { - this.resourceClaimManager = resourceClaimManager; - return this; - } - - public Builder queueSwapThreshold(final int queueSwapThreshold) { - this.queueSwapThreshold = queueSwapThreshold; - return this; - } - - public Builder defaultBackPressureObjectThreshold(final long defaultBackPressureObjectThreshold) { - this.defaultBackPressureObjectThreshold = defaultBackPressureObjectThreshold; - return this; - } - - public Builder defaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) { - this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold; + public Builder clustered(final boolean clustered) { + this.clustered = clustered; return this; } @@ -487,17 +467,8 @@ public final class StandardConnection implements Connection { if (destination == null) { throw new IllegalStateException("Cannot build a Connection without a Destination"); } - if (swapManager == null) { - throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager"); - } - if (flowFileRepository == null) { - throw new IllegalStateException("Cannot build a Connection without a FlowFile Repository"); - } - if (provenanceRepository == null) { - throw new IllegalStateException("Cannot build a Connection without a Provenance Repository"); - } - if (resourceClaimManager == null) { - throw new IllegalStateException("Cannot build a Connection without a Resource Claim Manager"); + if (flowFileQueueFactory == null) { + throw new IllegalStateException("Cannot build a Connection without a FlowFileQueueFactory"); } if (relationships == null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 208bbce..5f8f925 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -16,6 +16,26 @@ */ package org.apache.nifi.controller; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.SwapContents; +import org.apache.nifi.controller.repository.SwapManagerInitializationContext; +import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.swap.SchemaSwapDeserializer; +import org.apache.nifi.controller.swap.SchemaSwapSerializer; +import org.apache.nifi.controller.swap.SimpleSwapDeserializer; +import org.apache.nifi.controller.swap.SwapDeserializer; +import org.apache.nifi.controller.swap.SwapSerializer; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; @@ -29,34 +49,19 @@ import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Objects; +import java.util.Set; import java.util.UUID; import java.util.regex.Pattern; - -import org.apache.nifi.controller.queue.FlowFileQueue; -import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.controller.repository.FlowFileRepository; -import org.apache.nifi.controller.repository.FlowFileSwapManager; -import org.apache.nifi.controller.repository.SwapContents; -import org.apache.nifi.controller.repository.SwapManagerInitializationContext; -import org.apache.nifi.controller.repository.SwapSummary; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.controller.swap.SchemaSwapDeserializer; -import org.apache.nifi.controller.swap.SchemaSwapSerializer; -import org.apache.nifi.controller.swap.SimpleSwapDeserializer; -import org.apache.nifi.controller.swap.SwapDeserializer; -import org.apache.nifi.controller.swap.SwapSerializer; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.NiFiProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * <p> @@ -66,9 +71,8 @@ import org.slf4j.LoggerFactory; */ public class FileSystemSwapManager implements FlowFileSwapManager { - public static final int MINIMUM_SWAP_COUNT = 10000; - private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap"); - private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part"); + private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap"); + private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap\\.part"); public static final int SWAP_ENCODING_VERSION = 10; public static final String EVENT_CATEGORY = "Swap FlowFiles"; @@ -106,13 +110,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager { this.flowFileRepository = initializationContext.getFlowFileRepository(); } + @Override - public String swapOut(final List<FlowFileRecord> toSwap, final FlowFileQueue flowFileQueue) throws IOException { + public String swapOut(final List<FlowFileRecord> toSwap, final FlowFileQueue flowFileQueue, final String partitionName) throws IOException { if (toSwap == null || toSwap.isEmpty()) { return null; } - final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString() + ".swap"); + final String swapFilePrefix = System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString(); + final String swapFileBaseName = partitionName == null ? swapFilePrefix : swapFilePrefix + "." + partitionName; + final String swapFileName = swapFileBaseName + ".swap"; + + final File swapFile = new File(storageDirectory, swapFileName); final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part"); final String swapLocation = swapFile.getAbsolutePath(); @@ -185,8 +194,55 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } + private String getOwnerQueueIdentifier(final File swapFile) { + final String[] splits = swapFile.getName().split("-"); + if (splits.length > 6) { + final String queueIdentifier = splits[1] + "-" + splits[2] + "-" + splits[3] + "-" + splits[4] + "-" + splits[5]; + return queueIdentifier; + } + + return null; + } + + private String getOwnerPartition(final File swapFile) { + final String filename = swapFile.getName(); + final int indexOfDot = filename.indexOf("."); + if (indexOfDot < 1) { + return null; + } + + final int lastIndexOfDot = filename.lastIndexOf("."); + if (lastIndexOfDot == indexOfDot) { + return null; + } + + return filename.substring(indexOfDot + 1, lastIndexOfDot); + } + + @Override + public Set<String> getSwappedPartitionNames(final FlowFileQueue queue) { + final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() { + @Override + public boolean accept(final File dir, final String name) { + return SWAP_FILE_PATTERN.matcher(name).matches(); + } + }); + + if (swapFiles == null) { + return Collections.emptySet(); + } + + final String queueId = queue.getIdentifier(); + + return Stream.of(swapFiles) + .filter(swapFile -> queueId.equals(getOwnerQueueIdentifier(swapFile))) + .map(this::getOwnerPartition) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + @Override - public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException { + public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue, final String partitionName) throws IOException { final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() { @Override public boolean accept(final File dir, final String name) { @@ -212,15 +268,21 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } // split the filename by dashes. The old filenaming scheme was "<timestamp>-<randomuuid>.swap" but the new naming scheme is - // "<timestamp>-<queue identifier>-<random uuid>.swap". If we have two dashes, then we can just check if the queue ID is equal - // to the id of the queue given and if not we can just move on. - final String[] splits = swapFile.getName().split("-"); - if (splits.length > 6) { - final String queueIdentifier = splits[1] + "-" + splits[2] + "-" + splits[3] + "-" + splits[4] + "-" + splits[5]; - if (queueIdentifier.equals(flowFileQueue.getIdentifier())) { - swapLocations.add(swapFile.getAbsolutePath()); + // "<timestamp>-<queue identifier>-<random uuid>.[partition name.]swap". + final String ownerQueueId = getOwnerQueueIdentifier(swapFile); + if (ownerQueueId != null) { + if (!ownerQueueId.equals(flowFileQueue.getIdentifier())) { + continue; + } + + if (partitionName != null) { + final String ownerPartition = getOwnerPartition(swapFile); + if (!partitionName.equals(ownerPartition)) { + continue; + } } + swapLocations.add(swapFile.getAbsolutePath()); continue; } @@ -357,4 +419,28 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } } + + @Override + public String changePartitionName(final String swapLocation, final String newPartitionName) throws IOException { + final File existingFile = new File(swapLocation); + if (!existingFile.exists()) { + throw new FileNotFoundException("Could not change name of partition for swap location " + swapLocation + " because no swap file exists at that location"); + } + + final String existingFilename = existingFile.getName(); + + final String newFilename; + final int dotIndex = existingFilename.indexOf("."); + if (dotIndex < 0) { + newFilename = existingFilename + "." + newPartitionName + ".swap"; + } else { + newFilename = existingFilename.substring(0, dotIndex) + "." + newPartitionName + ".swap"; + } + + final File newFile = new File(existingFile.getParentFile(), newFilename); + // Use Files.move and convert to Path's instead of File.rename so that we get an IOException on failure that describes why we failed. + Files.move(existingFile.toPath(), newFile.toPath()); + + return newFile.getAbsolutePath(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 21c61e9..ebd809c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -35,6 +35,8 @@ import org.apache.nifi.authorization.resource.DataAuthorizable; import org.apache.nifi.authorization.resource.ProvenanceDataAuthorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.util.IdentityMapping; +import org.apache.nifi.authorization.util.IdentityMappingUtil; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.cluster.coordination.ClusterCoordinator; @@ -76,8 +78,23 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.label.StandardLabel; import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener; +import org.apache.nifi.controller.queue.ConnectionEventListener; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileQueueFactory; +import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.StandardFlowFileQueue; +import org.apache.nifi.controller.queue.clustered.ContentRepositoryFlowFileAccess; +import org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue; +import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec; +import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientFactory; +import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientRegistry; +import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask; +import org.apache.nifi.controller.queue.clustered.server.ClusterLoadBalanceAuthorizer; +import org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer; +import org.apache.nifi.controller.queue.clustered.server.LoadBalanceAuthorizer; +import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol; +import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.reporting.ReportingTaskProvider; import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; @@ -243,6 +260,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; import java.net.URL; import java.util.ArrayList; import java.util.Collection; @@ -324,6 +342,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final VariableRegistry variableRegistry; private final ConcurrentMap<String, ControllerServiceNode> rootControllerServices = new ConcurrentHashMap<>(); + private final ConnectionLoadBalanceServer loadBalanceServer; + private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry; + private final FlowEngine loadBalanceClientThreadPool; + private final Set<NioAsyncLoadBalanceClientTask> loadBalanceClientTasks = new HashSet<>(); + private final ConcurrentMap<String, ProcessorNode> allProcessors = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ProcessGroup> allProcessGroups = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Connection> allConnections = new ConcurrentHashMap<>(); @@ -673,8 +696,40 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R leaderElectionManager.start(); heartbeatMonitor.start(); + + final InetSocketAddress loadBalanceAddress = nifiProperties.getClusterLoadBalanceAddress(); + // Setup Load Balancing Server + final EventReporter eventReporter = createEventReporter(bulletinRepository); + final List<IdentityMapping> identityMappings = IdentityMappingUtil.getIdentityMappings(nifiProperties); + final LoadBalanceAuthorizer authorizeConnection = new ClusterLoadBalanceAuthorizer(clusterCoordinator, eventReporter); + final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepository, provenanceRepository, this, authorizeConnection); + + final int numThreads = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT); + final String timeoutPeriod = nifiProperties.getProperty(NiFiProperties.LOAD_BALANCE_COMMS_TIMEOUT, NiFiProperties.DEFAULT_LOAD_BALANCE_COMMS_TIMEOUT); + final int timeoutMillis = (int) FormatUtils.getTimeDuration(timeoutPeriod, TimeUnit.MILLISECONDS); + + loadBalanceServer = new ConnectionLoadBalanceServer(loadBalanceAddress.getHostName(), loadBalanceAddress.getPort(), sslContext, + numThreads, loadBalanceProtocol, eventReporter, timeoutMillis); + + + final int connectionsPerNode = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_CONNECTIONS_PER_NODE, NiFiProperties.DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE); + final NioAsyncLoadBalanceClientFactory asyncClientFactory = new NioAsyncLoadBalanceClientFactory(sslContext, timeoutMillis, new ContentRepositoryFlowFileAccess(contentRepository), + eventReporter, new StandardLoadBalanceFlowFileCodec()); + loadBalanceClientRegistry = new NioAsyncLoadBalanceClientRegistry(asyncClientFactory, connectionsPerNode); + + final int loadBalanceClientThreadCount = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT); + loadBalanceClientThreadPool = new FlowEngine(loadBalanceClientThreadCount, "Load-Balanced Client", true); + + for (int i=0; i < loadBalanceClientThreadCount; i++) { + final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(loadBalanceClientRegistry, clusterCoordinator, eventReporter); + loadBalanceClientTasks.add(clientTask); + loadBalanceClientThreadPool.submit(clientTask); + } } else { + loadBalanceClientRegistry = null; heartbeater = null; + loadBalanceServer = null; + loadBalanceClientThreadPool = null; } } @@ -775,6 +830,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R listener.start(); } + if (loadBalanceServer != null) { + loadBalanceServer.start(); + } + notifyComponentsConfigurationRestored(); timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { @@ -940,11 +999,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R startConnectablesAfterInitialization.clear(); startRemoteGroupPortsAfterInitialization.clear(); } + + for (final Connection connection : getRootGroup().findAllConnections()) { + connection.getFlowFileQueue().startLoadBalancing(); + } } finally { writeLock.unlock("onFlowInitialized"); } } + public boolean isStartAfterInitialization(final Connectable component) { + return startConnectablesAfterInitialization.contains(component) || startRemoteGroupPortsAfterInitialization.contains(component); + } + private ContentRepository createContentRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException { final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION); if (implementationClassName == null) { @@ -1040,20 +1107,35 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R swapManager.initialize(initializationContext); } - return builder.id(requireNonNull(id).intern()) + final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() { + @Override + public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) { + final FlowFileQueue flowFileQueue; + + if (clusterCoordinator == null) { + flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager, + eventReporter, nifiProperties.getQueueSwapThreshold(), nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold()); + } else { + flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, eventListener, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager, + clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter); + + flowFileQueue.setBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold()); + flowFileQueue.setBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold()); + } + + return flowFileQueue; + } + }; + + final Connection connection = builder.id(requireNonNull(id).intern()) .name(name == null ? null : name.intern()) .relationships(relationships) .source(requireNonNull(source)) .destination(destination) - .swapManager(swapManager) - .queueSwapThreshold(nifiProperties.getQueueSwapThreshold()) - .defaultBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold()) - .defaultBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold()) - .eventReporter(eventReporter) - .resourceClaimManager(resourceClaimManager) - .flowFileRepository(flowFileRepository) - .provenanceRepository(provenanceRepository) + .flowFileQueueFactory(flowFileQueueFactory) .build(); + + return connection; } /** @@ -1561,6 +1643,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R zooKeeperStateServer.shutdown(); } + if (loadBalanceClientThreadPool != null) { + loadBalanceClientThreadPool.shutdownNow(); + } + loadBalanceClientTasks.forEach(NioAsyncLoadBalanceClientTask::stop); + // Trigger any processors' methods marked with @OnShutdown to be called getRootGroup().shutdown(); @@ -1606,6 +1693,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R listener.stop(); } + if (loadBalanceServer != null) { + loadBalanceServer.stop(); + } + + if (loadBalanceClientRegistry != null) { + loadBalanceClientRegistry.stop(); + } + if (processScheduler != null) { processScheduler.shutdown(); } @@ -2226,6 +2321,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R queue.setPriorities(newPrioritizers); } + final String loadBalanceStrategyName = connectionDTO.getLoadBalanceStrategy(); + if (loadBalanceStrategyName != null) { + final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName); + final String partitioningAttribute = connectionDTO.getLoadBalancePartitionAttribute(); + queue.setLoadBalanceStrategy(loadBalanceStrategy, partitioningAttribute); + } + connection.setProcessGroup(group); group.addConnection(connection); } @@ -2737,6 +2839,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public void onConnectionAdded(final Connection connection) { allConnections.put(connection.getIdentifier(), connection); + + if (isInitialized()) { + connection.getFlowFileQueue().startLoadBalancing(); + } } public void onConnectionRemoved(final Connection connection) { @@ -3494,6 +3600,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + public void stopTransmitting(final RemoteGroupPort remoteGroupPort) { + writeLock.lock(); + try { + if (initialized.get()) { + remoteGroupPort.getRemoteProcessGroup().stopTransmitting(remoteGroupPort); + } else { + startRemoteGroupPortsAfterInitialization.remove(remoteGroupPort); + } + } finally { + writeLock.unlock("stopTransmitting"); + } + } + public void stopProcessor(final String parentGroupId, final String processorId) { final ProcessGroup group = lookupGroup(parentGroupId); final ProcessorNode node = group.getProcessor(processorId); @@ -4344,10 +4463,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R leaderElectionManager.start(); stateManagerProvider.enableClusterProvider(); + loadBalanceClientRegistry.start(); + heartbeat(); } else { stateManagerProvider.disableClusterProvider(); - setPrimary(false); } @@ -4369,6 +4489,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + + /** * @return true if this instance is the primary node in the cluster; false * otherwise
