http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FirstNodePartitioner.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FirstNodePartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FirstNodePartitioner.java new file mode 100644 index 0000000..10d584e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FirstNodePartitioner.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.controller.repository.FlowFileRecord; + +public class FirstNodePartitioner implements FlowFilePartitioner { + + @Override + public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) { + return partitions[0]; + } + + @Override + public boolean isRebalanceOnClusterResize() { + return true; + } + + @Override + public boolean isRebalanceOnFailure() { + return false; + } + + @Override + public boolean isPartitionStatic() { + return true; + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FlowFilePartitioner.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FlowFilePartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FlowFilePartitioner.java new file mode 100644 index 0000000..4f528e4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FlowFilePartitioner.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.controller.repository.FlowFileRecord; + +public interface FlowFilePartitioner { + + /** + * Determines which partition the given FlowFile should go to + * + * @param flowFile the FlowFile to partition + * @param partitions the partitions to choose from + * @param localPartition the local partition, which is also included in the given array of partitions + * @return the partition for the FlowFile + */ + QueuePartition getPartition(FlowFileRecord flowFile, QueuePartition[] partitions, QueuePartition localPartition); + + /** + * @return <code>true</code> if a change in the size of a cluster should result in re-balancing all FlowFiles in queue, + * <code>false</code> if a change in the size of a cluster does not require re-balancing. + */ + boolean isRebalanceOnClusterResize(); + + /** + * @return <code>true</code> if FlowFiles should be rebalanced to another partition if they cannot be sent to the designated peer, + * <code>false</code> if a failure should result in the FlowFiles remaining in same partition. + */ + boolean isRebalanceOnFailure(); + + /** + * @return <code>true</code> if the return value of {@link #getPartition(FlowFileRecord, QueuePartition[], QueuePartition)} will be the same + * regardless of how many times it is called or which FlowFiles are passed. + */ + default boolean isPartitionStatic() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalPartitionPartitioner.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalPartitionPartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalPartitionPartitioner.java new file mode 100644 index 0000000..0f9f9f7 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalPartitionPartitioner.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.controller.repository.FlowFileRecord; + +public class LocalPartitionPartitioner implements FlowFilePartitioner { + @Override + public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) { + return localPartition; + } + + @Override + public boolean isRebalanceOnClusterResize() { + return false; + } + + @Override + public boolean isRebalanceOnFailure() { + return false; + } + + @Override + public boolean isPartitionStatic() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java new file mode 100644 index 0000000..9ee0e0e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.controller.queue.FlowFileQueueContents; +import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.processor.FlowFileFilter; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +/** + * An extension of a Queue Partition that contains the methods necessary for Processors, Funnels, and Ports to interact with the Partition + * as if it were an entire FlowFile Queue itself. + */ +public interface LocalQueuePartition extends QueuePartition { + /** + * @return <code>true</code> if the active queue is empty, <code>false</code> otherwise + */ + boolean isActiveQueueEmpty(); + + /** + * @return <code>true</code> if there is at least one FlowFile that has not yet been acknowledged, <code>false</code> if all FlowFiles have been acknowledged. + */ + boolean isUnacknowledgedFlowFile(); + + /** + * Returns a single FlowFile with the highest priority that is available in the partition, or <code>null</code> if no FlowFile is available + * + * @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added + * @return a single FlowFile with the highest priority that is available in the partition, or <code>null</code> if no FlowFile is available + */ + FlowFileRecord poll(Set<FlowFileRecord> expiredRecords); + + /** + * Returns up to <code>maxResults</code> FlowFiles from the queue + * + * @param maxResults the maximum number of FlowFiles to return + * @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added + * @return a List of FlowFiles (possibly empty) with the highest priority FlowFiles that are available in the partition + */ + List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords); + + /** + * Returns a List of FlowFiles that match the given filter + * + * @param filter the filter to determine whether or not a given FlowFile should be returned + * @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added + * @return a List of FlowFiles (possibly empty) with FlowFiles that matched the given filter + */ + List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords); + + /** + * Acknowledges that the given FlowFile has been accounted for and is no longer the responsibility of this partition + * @param flowFile the FlowFile that has been accounted for + */ + void acknowledge(FlowFileRecord flowFile); + + /** + * Acknowledges that the given FlowFiles have been accounted for and is no longer the responsibility of this partition + * @param flowFiles the FlowFiles that have been accounted for + */ + void acknowledge(Collection<FlowFileRecord> flowFiles); + + /** + * Returns the FlowFile with the given UUID, or <code>null</code> if the FlowFile with that UUID is not found in the partition + * + * @param flowFileUuid the UUID of the FlowFile + * @return the FlowFile with the given UUID or <code>null</code> if the FlowFile cannot be found + * @throws IOException if unable to read swapped data from a swap file + */ + FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException; + + /** + * Returns the FlowFiles that can be provided as the result of as "List FlowFiles" action + * @return a List of FlowFiles + */ + List<FlowFileRecord> getListableFlowFiles(); + + /** + * Inherits the contents of another queue/partition + * @param queueContents the contents to inherit + */ + void inheritQueueContents(FlowFileQueueContents queueContents); + + /** + * @return diagnostics information about the queue partition + */ + LocalQueuePartitionDiagnostics getQueueDiagnostics(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/QueuePartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/QueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/QueuePartition.java new file mode 100644 index 0000000..c73525b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/QueuePartition.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.queue.DropFlowFileRequest; +import org.apache.nifi.controller.queue.FlowFileQueueContents; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.flowfile.FlowFilePrioritizer; + +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +/** + * Represents a portion of a FlowFile Queue such that a FlowFile Queue can be broken into + * a local queue partition and 0 or more Remote Queue Partitions. + */ +public interface QueuePartition { + /** + * Discovers any FlowFiles that have been swapped out, returning a summary of the swap files' contents + * @return a summary of the swap files' contents + */ + SwapSummary recoverSwappedFlowFiles(); + + /** + * @return the Node Identifier that this Queue Partition corresponds to, or and empty Optional if the Node Identifier is not yet known. + */ + Optional<NodeIdentifier> getNodeIdentifier(); + + /** + * @return the name of the Partition that is used when serializing swap flowfiles in order to denote that a swap file belongs to this partition + */ + String getSwapPartitionName(); + + /** + * Adds the given FlowFile to this partition + * @param flowFile the FlowFile to add + */ + void put(FlowFileRecord flowFile); + + /** + * Adds the given FlowFiles to this partition + * @param flowFiles the FlowFiles to add + */ + void putAll(Collection<FlowFileRecord> flowFiles); + + /** + * Drops the FlowFiles in this partition + * @param dropRequest the FlowFile Drop Request + * @param requestor the user making the request + */ + void dropFlowFiles(DropFlowFileRequest dropRequest, String requestor); + + /** + * Updates the prioritizers to use when queueing data + * @param newPriorities the new priorities + */ + void setPriorities(List<FlowFilePrioritizer> newPriorities); + + /** + * Starts distributing FlowFiles to their desired destinations + * + * @param flowFilePartitioner the Partitioner that is being used to determine which FlowFiles should belong to this Partition + */ + void start(FlowFilePartitioner flowFilePartitioner); + + /** + * Stop distributing FlowFiles to other nodes in the cluster. This does not interrupt any active transactions but will cause the + * partition to not create any more transactions until it is started again. + */ + void stop(); + + /** + * Provides a {@link FlowFileQueueContents} that can be transferred to another partition + * @param newPartitionName the name of the partition to which the data is being transferred (see {@link #getSwapPartitionName()}. + * @return the contents of the queue + */ + FlowFileQueueContents packageForRebalance(String newPartitionName); + + /** + * @return the current size of the partition's queue + */ + QueueSize size(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RebalancingPartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RebalancingPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RebalancingPartition.java new file mode 100644 index 0000000..9713814 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RebalancingPartition.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.controller.queue.FlowFileQueueContents; +import org.apache.nifi.controller.repository.FlowFileRecord; + +import java.util.Collection; + +/** + * A partition whose sole job it is to redistribute FlowFiles to the appropriate partitions. + */ +public interface RebalancingPartition extends QueuePartition { + + /** + * Inherits all of the FlowFiles, including FlowFiles that have been swaped out, in order to + * redistribute them across the cluster + * + * @param queueContents the contents of a FlowFileQueue (or partition) + */ + void rebalance(FlowFileQueueContents queueContents); + + /** + * Inherits all of the givne FlowFiles in order to redistribute them across the cluster + * + * @param flowFiles the FlowFiles to redistribute + */ + void rebalance(Collection<FlowFileRecord> flowFiles); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java new file mode 100644 index 0000000..a78de55 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.queue.DropFlowFileRequest; +import org.apache.nifi.controller.queue.FlowFileQueueContents; +import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics; +import org.apache.nifi.controller.queue.StandardRemoteQueuePartitionDiagnostics; +import org.apache.nifi.controller.queue.SwappablePriorityQueue; +import org.apache.nifi.controller.queue.clustered.TransferFailureDestination; +import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry; +import org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback; +import org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback; +import org.apache.nifi.controller.repository.ContentNotFoundException; +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.RepositoryRecord; +import org.apache.nifi.controller.repository.StandardRepositoryRecord; +import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * A Queue Partition that is responsible for transferring FlowFiles to another node in the cluster + */ +public class RemoteQueuePartition implements QueuePartition { + private static final Logger logger = LoggerFactory.getLogger(RemoteQueuePartition.class); + + private final NodeIdentifier nodeIdentifier; + private final SwappablePriorityQueue priorityQueue; + private final LoadBalancedFlowFileQueue flowFileQueue; + private final TransferFailureDestination failureDestination; + + private final FlowFileRepository flowFileRepo; + private final ProvenanceEventRepository provRepo; + private final ContentRepository contentRepo; + private final AsyncLoadBalanceClientRegistry clientRegistry; + + private boolean running = false; + private final String description; + + public RemoteQueuePartition(final NodeIdentifier nodeId, final SwappablePriorityQueue priorityQueue, final TransferFailureDestination failureDestination, + final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, final ContentRepository contentRepository, + final AsyncLoadBalanceClientRegistry clientRegistry, final LoadBalancedFlowFileQueue flowFileQueue) { + + this.nodeIdentifier = nodeId; + this.priorityQueue = priorityQueue; + this.flowFileQueue = flowFileQueue; + this.failureDestination = failureDestination; + this.flowFileRepo = flowFileRepo; + this.provRepo = provRepo; + this.contentRepo = contentRepository; + this.clientRegistry = clientRegistry; + this.description = "RemoteQueuePartition[queueId=" + flowFileQueue.getIdentifier() + ", nodeId=" + nodeIdentifier + "]"; + } + + @Override + public QueueSize size() { + return priorityQueue.size(); + } + + @Override + public String getSwapPartitionName() { + return nodeIdentifier.getId(); + } + + @Override + public Optional<NodeIdentifier> getNodeIdentifier() { + return Optional.ofNullable(nodeIdentifier); + } + + @Override + public void put(final FlowFileRecord flowFile) { + priorityQueue.put(flowFile); + } + + @Override + public void putAll(final Collection<FlowFileRecord> flowFiles) { + priorityQueue.putAll(flowFiles); + } + + @Override + public void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor) { + priorityQueue.dropFlowFiles(dropRequest, requestor); + } + + @Override + public SwapSummary recoverSwappedFlowFiles() { + return priorityQueue.recoverSwappedFlowFiles(); + } + + @Override + public FlowFileQueueContents packageForRebalance(String newPartitionName) { + return priorityQueue.packageForRebalance(newPartitionName); + } + + @Override + public void setPriorities(final List<FlowFilePrioritizer> newPriorities) { + priorityQueue.setPriorities(newPriorities); + } + + private FlowFileRecord getFlowFile() { + final Set<FlowFileRecord> expired = new HashSet<>(); + final FlowFileRecord flowFile = priorityQueue.poll(expired, flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS)); + flowFileQueue.handleExpiredRecords(expired); + return flowFile; + } + + @Override + public synchronized void start(final FlowFilePartitioner partitioner) { + if (running) { + return; + } + + final TransactionFailureCallback failureCallback = new TransactionFailureCallback() { + @Override + public void onTransactionFailed(final List<FlowFileRecord> flowFiles, final Exception cause, final TransactionPhase phase) { + // In the case of failure, we need to acknowledge the FlowFiles that were removed from the queue, + // and then put the FlowFiles back, or transfer them to another partition. We do not call + // flowFileQueue#onTransfer in the case of failure, though, because the size of the FlowFileQueue itself + // has not changed. They FlowFiles were just re-queued or moved between partitions. + priorityQueue.acknowledge(flowFiles); + + if (cause instanceof ContentNotFoundException) { + // Handle ContentNotFound by creating a RepositoryRecord for the FlowFile and marking as aborted, then updating the + // FlowFiles and Provenance Repositories accordingly. This follows the same pattern as StandardProcessSession so that + // we have a consistent way of handling this case. + final Optional<FlowFileRecord> optionalFlowFile = ((ContentNotFoundException) cause).getFlowFile(); + if (optionalFlowFile.isPresent()) { + final List<FlowFileRecord> successfulFlowFiles = new ArrayList<>(flowFiles); + + final FlowFileRecord flowFile = optionalFlowFile.get(); + successfulFlowFiles.remove(flowFile); + + final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(flowFileQueue, flowFile); + repoRecord.markForAbort(); + + updateRepositories(Collections.emptyList(), Collections.singleton(repoRecord)); + + // If unable to even connect to the node, go ahead and transfer all FlowFiles for this queue to the failure destination. + // In either case, transfer those FlowFiles that we failed to send. + if (phase == TransactionPhase.CONNECTING) { + failureDestination.putAll(priorityQueue::packageForRebalance, partitioner); + } + failureDestination.putAll(successfulFlowFiles, partitioner); + + flowFileQueue.onTransfer(Collections.singleton(flowFile)); // Want to ensure that we update queue size because FlowFile won't be re-queued. + + return; + } + } + + // If unable to even connect to the node, go ahead and transfer all FlowFiles for this queue to the failure destination. + // In either case, transfer those FlowFiles that we failed to send. + if (phase == TransactionPhase.CONNECTING) { + failureDestination.putAll(priorityQueue::packageForRebalance, partitioner); + } + failureDestination.putAll(flowFiles, partitioner); + } + + @Override + public boolean isRebalanceOnFailure() { + return failureDestination.isRebalanceOnFailure(partitioner); + } + }; + + final TransactionCompleteCallback successCallback = new TransactionCompleteCallback() { + @Override + public void onTransactionComplete(final List<FlowFileRecord> flowFilesSent) { + // We've now completed the transaction. We must now update the repositories and "keep the books", acknowledging the FlowFiles + // with the queue so that its size remains accurate. + priorityQueue.acknowledge(flowFilesSent); + flowFileQueue.onTransfer(flowFilesSent); + updateRepositories(flowFilesSent, Collections.emptyList()); + } + }; + + clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, priorityQueue::isEmpty, this::getFlowFile, + failureCallback, successCallback, flowFileQueue::getLoadBalanceCompression, flowFileQueue::isPropagateBackpressureAcrossNodes); + + running = true; + } + + public void onRemoved() { + clientRegistry.unregister(flowFileQueue.getIdentifier(), nodeIdentifier); + } + + + /** + * Updates the FlowFileRepository, Provenance Repository, and claimant counts in the Content Repository. + * + * @param flowFilesSent the FlowFiles that were sent to another node. + * @param abortedRecords the Repository Records for any FlowFile whose content was missing. + */ + private void updateRepositories(final List<FlowFileRecord> flowFilesSent, final Collection<RepositoryRecord> abortedRecords) { + // We update the Provenance Repository first. This way, even if we restart before we update the FlowFile repo, we have the record + // that the data was sent in the Provenance Repository. We then update the content claims and finally the FlowFile Repository. We do it + // in this order so that when the FlowFile repo is sync'ed to disk, we know which Content Claims are no longer in use. Updating the FlowFile + // Repo first could result in holding those Content Claims on disk longer than we need to. + // + // Additionally, we are iterating over the FlowFiles sent multiple times. We could refactor this to iterate over them just once and then + // create the Provenance Events and Repository Records in a single pass. Doing so, however, would mean that we need to keep both collections + // of objects in heap at the same time. Using multiple passes allows the Provenance Events to be freed from heap by the GC before the Repo Records + // are ever created. + final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFilesSent.size() * 2 + abortedRecords.size()); + for (final FlowFileRecord sent : flowFilesSent) { + provenanceEvents.add(createSendEvent(sent)); + provenanceEvents.add(createDropEvent(sent)); + } + + for (final RepositoryRecord abortedRecord : abortedRecords) { + final FlowFileRecord abortedFlowFile = abortedRecord.getCurrent(); + provenanceEvents.add(createDropEvent(abortedFlowFile, "Content Not Found")); + } + + provRepo.registerEvents(provenanceEvents); + + // Update the FlowFile Repository & content claim counts last + final List<RepositoryRecord> flowFileRepoRecords = flowFilesSent.stream() + .map(this::createRepositoryRecord) + .collect(Collectors.toCollection(ArrayList::new)); + + flowFileRepoRecords.addAll(abortedRecords); + + // Decrement claimant count for each FlowFile. + flowFileRepoRecords.stream() + .map(RepositoryRecord::getCurrentClaim) + .forEach(contentRepo::decrementClaimantCount); + + try { + flowFileRepo.updateRepository(flowFileRepoRecords); + } catch (final Exception e) { + logger.error("Unable to update FlowFile repository to indicate that {} FlowFiles have been transferred to {}. " + + "It is possible that these FlowFiles will be duplicated upon restart of NiFi.", flowFilesSent.size(), getNodeIdentifier(), e); + } + } + + private RepositoryRecord createRepositoryRecord(final FlowFileRecord flowFile) { + final StandardRepositoryRecord record = new StandardRepositoryRecord(flowFileQueue, flowFile); + record.markForDelete(); + return record; + } + + private ProvenanceEventRecord createSendEvent(final FlowFileRecord flowFile) { + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder() + .fromFlowFile(flowFile) + .setEventType(ProvenanceEventType.SEND) + .setDetails("Re-distributed for Load-balanced connection") + .setComponentId(flowFileQueue.getIdentifier()) + .setComponentType("Connection") + .setSourceQueueIdentifier(flowFileQueue.getIdentifier()) + .setSourceSystemFlowFileIdentifier(flowFile.getAttribute(CoreAttributes.UUID.key())) + .setTransitUri("nifi:connection:" + flowFileQueue.getIdentifier()); + + final ContentClaim contentClaim = flowFile.getContentClaim(); + if (contentClaim != null) { + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + builder.setCurrentContentClaim(resourceClaim.getContainer(),resourceClaim.getSection() ,resourceClaim.getId(), + contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize()); + + builder.setPreviousContentClaim(resourceClaim.getContainer(),resourceClaim.getSection() ,resourceClaim.getId(), + contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize()); + } + + final ProvenanceEventRecord sendEvent = builder.build(); + + return sendEvent; + } + + private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile) { + return createDropEvent(flowFile, null); + } + + private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String details) { + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder() + .fromFlowFile(flowFile) + .setEventType(ProvenanceEventType.DROP) + .setDetails(details) + .setComponentId(flowFileQueue.getIdentifier()) + .setComponentType("Connection") + .setSourceQueueIdentifier(flowFileQueue.getIdentifier()); + + final ContentClaim contentClaim = flowFile.getContentClaim(); + if (contentClaim != null) { + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + builder.setCurrentContentClaim(resourceClaim.getContainer(),resourceClaim.getSection() ,resourceClaim.getId(), + contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize()); + + builder.setPreviousContentClaim(resourceClaim.getContainer(),resourceClaim.getSection() ,resourceClaim.getId(), + contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize()); + } + + final ProvenanceEventRecord dropEvent = builder.build(); + + return dropEvent; + } + + + @Override + public synchronized void stop() { + running = false; + clientRegistry.unregister(flowFileQueue.getIdentifier(), nodeIdentifier); + } + + public RemoteQueuePartitionDiagnostics getDiagnostics() { + return new StandardRemoteQueuePartitionDiagnostics(nodeIdentifier.toString(), priorityQueue.getFlowFileQueueSize()); + } + + @Override + public String toString() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RoundRobinPartitioner.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RoundRobinPartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RoundRobinPartitioner.java new file mode 100644 index 0000000..c08724d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RoundRobinPartitioner.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.controller.repository.FlowFileRecord; + +import java.util.concurrent.atomic.AtomicLong; + +public class RoundRobinPartitioner implements FlowFilePartitioner { + private final AtomicLong counter = new AtomicLong(0L); + + @Override + public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) { + final long count = counter.getAndIncrement(); + final int index = (int) (count % partitions.length); + return partitions[index]; + } + + @Override + public boolean isRebalanceOnClusterResize() { + return false; + } + + + @Override + public boolean isRebalanceOnFailure() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java new file mode 100644 index 0000000..74a8aa6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.queue.BlockingSwappablePriorityQueue; +import org.apache.nifi.controller.queue.DropFlowFileAction; +import org.apache.nifi.controller.queue.DropFlowFileRequest; +import org.apache.nifi.controller.queue.FlowFileQueueContents; +import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFilePrioritizer; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public class StandardRebalancingPartition implements RebalancingPartition { + private final String SWAP_PARTITION_NAME = "rebalance"; + private final String queueIdentifier; + private final BlockingSwappablePriorityQueue queue; + private final LoadBalancedFlowFileQueue flowFileQueue; + private final String description; + + private volatile boolean stopped = true; + private RebalanceTask rebalanceTask; + + + public StandardRebalancingPartition(final FlowFileSwapManager swapManager, final int swapThreshold, final EventReporter eventReporter, + final LoadBalancedFlowFileQueue flowFileQueue, final DropFlowFileAction dropAction) { + + this.queue = new BlockingSwappablePriorityQueue(swapManager, swapThreshold, eventReporter, flowFileQueue, dropAction, SWAP_PARTITION_NAME); + this.queueIdentifier = flowFileQueue.getIdentifier(); + this.flowFileQueue = flowFileQueue; + this.description = "RebalancingPartition[queueId=" + queueIdentifier + "]"; + } + + @Override + public Optional<NodeIdentifier> getNodeIdentifier() { + return Optional.empty(); + } + + @Override + public QueueSize size() { + return queue.size(); + } + + @Override + public SwapSummary recoverSwappedFlowFiles() { + return this.queue.recoverSwappedFlowFiles(); + } + + @Override + public String getSwapPartitionName() { + return SWAP_PARTITION_NAME; + } + + @Override + public void put(final FlowFileRecord flowFile) { + queue.put(flowFile); + } + + @Override + public void putAll(final Collection<FlowFileRecord> flowFiles) { + queue.putAll(flowFiles); + } + + @Override + public void dropFlowFiles(DropFlowFileRequest dropRequest, String requestor) { + queue.dropFlowFiles(dropRequest, requestor); + } + + @Override + public void setPriorities(final List<FlowFilePrioritizer> newPriorities) { + queue.setPriorities(newPriorities); + } + + @Override + public synchronized void start(final FlowFilePartitioner partitionerUsed) { + stopped = false; + rebalanceFromQueue(); + } + + @Override + public synchronized void stop() { + stopped = true; + + if (this.rebalanceTask != null) { + this.rebalanceTask.stop(); + } + + this.rebalanceTask = null; + } + + private synchronized void rebalanceFromQueue() { + if (stopped) { + return; + } + + // If a task is already defined, do nothing. There's already a thread running. + if (rebalanceTask != null) { + return; + } + + this.rebalanceTask = new RebalanceTask(); + + final Thread rebalanceThread = new Thread(this.rebalanceTask); + rebalanceThread.setName("Rebalance queued data for Connection " + queueIdentifier); + rebalanceThread.start(); + } + + @Override + public void rebalance(final FlowFileQueueContents queueContents) { + if (queueContents.getActiveFlowFiles().isEmpty() && queueContents.getSwapLocations().isEmpty()) { + return; + } + + queue.inheritQueueContents(queueContents); + rebalanceFromQueue(); + } + + @Override + public void rebalance(final Collection<FlowFileRecord> flowFiles) { + queue.putAll(flowFiles); + rebalanceFromQueue(); + } + + @Override + public FlowFileQueueContents packageForRebalance(String newPartitionName) { + return queue.packageForRebalance(newPartitionName); + } + + private synchronized boolean complete() { + if (!queue.isEmpty()) { + return false; + } + + this.rebalanceTask = null; + return true; + } + + + private class RebalanceTask implements Runnable { + private volatile boolean stopped = false; + private final Set<FlowFileRecord> expiredRecords = new HashSet<>(); + private final long pollWaitMillis = 100L; + + public void stop() { + stopped = true; + } + + @Override + public void run() { + while (!stopped) { + final FlowFileRecord polled; + + expiredRecords.clear(); + + // Wait up to #pollWaitMillis milliseconds to get a FlowFile. If none, then check if stopped + // and if not, poll again. + try { + polled = queue.poll(expiredRecords, -1, pollWaitMillis); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + continue; + } + + if (polled == null) { + flowFileQueue.handleExpiredRecords(expiredRecords); + + if (complete()) { + return; + } else { + continue; + } + } + + // We got 1 FlowFile. Try a second poll to obtain up to 999 more (for a total of 1,000). + final List<FlowFileRecord> toDistribute = new ArrayList<>(); + toDistribute.add(polled); + + final List<FlowFileRecord> additionalRecords = queue.poll(999, expiredRecords, -1); + toDistribute.addAll(additionalRecords); + + flowFileQueue.handleExpiredRecords(expiredRecords); + + // Transfer all of the FlowFiles that we got back to the FlowFileQueue itself. This will cause the data to be + // re-partitioned and binned appropriately. We also then need to ensure that we acknowledge the data from our + // own SwappablePriorityQueue to ensure that the sizes are kept in check. + flowFileQueue.distributeToPartitions(toDistribute); + queue.acknowledge(toDistribute); + } + } + } + + @Override + public String toString() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java new file mode 100644 index 0000000..e5e64d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.queue.DropFlowFileAction; +import org.apache.nifi.controller.queue.DropFlowFileRequest; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileQueueContents; +import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.SwappablePriorityQueue; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.processor.FlowFileFilter; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * A Local Queue Partition that whose implementation is based on the use of a {@link SwappablePriorityQueue}. + */ +public class SwappablePriorityQueueLocalPartition implements LocalQueuePartition { + private static final String SWAP_PARTITION_NAME = "local"; + + private final SwappablePriorityQueue priorityQueue; + private final FlowFileQueue flowFileQueue; + private final String description; + + public SwappablePriorityQueueLocalPartition(final FlowFileSwapManager swapManager, final int swapThreshold, final EventReporter eventReporter, + final FlowFileQueue flowFileQueue, final DropFlowFileAction dropAction) { + this.priorityQueue = new SwappablePriorityQueue(swapManager, swapThreshold, eventReporter, flowFileQueue, dropAction, SWAP_PARTITION_NAME); + this.flowFileQueue = flowFileQueue; + this.description = "SwappablePriorityQueueLocalPartition[queueId=" + flowFileQueue.getIdentifier() + "]"; + } + + @Override + public String getSwapPartitionName() { + return SWAP_PARTITION_NAME; + } + + @Override + public QueueSize size() { + return priorityQueue.size(); + } + + @Override + public boolean isUnacknowledgedFlowFile() { + return priorityQueue.isUnacknowledgedFlowFile(); + } + + @Override + public Optional<NodeIdentifier> getNodeIdentifier() { + return Optional.empty(); + } + + @Override + public void put(final FlowFileRecord flowFile) { + priorityQueue.put(flowFile); + } + + @Override + public void putAll(final Collection<FlowFileRecord> flowFiles) { + priorityQueue.putAll(flowFiles); + } + + @Override + public boolean isActiveQueueEmpty() { + return priorityQueue.isActiveQueueEmpty(); + } + + @Override + public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) { + return priorityQueue.poll(expiredRecords, getExpiration()); + } + + @Override + public List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords) { + return priorityQueue.poll(maxResults, expiredRecords, getExpiration()); + } + + @Override + public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) { + return priorityQueue.poll(filter, expiredRecords, getExpiration()); + } + + private int getExpiration() { + return flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS); + } + + @Override + public FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException { + return priorityQueue.getFlowFile(flowFileUuid); + } + + @Override + public List<FlowFileRecord> getListableFlowFiles() { + return priorityQueue.getActiveFlowFiles(); + } + + @Override + public void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor) { + priorityQueue.dropFlowFiles(dropRequest, requestor); + } + + @Override + public SwapSummary recoverSwappedFlowFiles() { + return priorityQueue.recoverSwappedFlowFiles(); + } + + @Override + public void setPriorities(final List<FlowFilePrioritizer> newPriorities) { + priorityQueue.setPriorities(newPriorities); + } + + @Override + public void acknowledge(final FlowFileRecord flowFile) { + priorityQueue.acknowledge(flowFile); + } + + @Override + public void acknowledge(final Collection<FlowFileRecord> flowFiles) { + priorityQueue.acknowledge(flowFiles); + } + + @Override + public LocalQueuePartitionDiagnostics getQueueDiagnostics() { + return priorityQueue.getQueueDiagnostics(); + } + + @Override + public FlowFileQueueContents packageForRebalance(String newPartitionName) { + return priorityQueue.packageForRebalance(newPartitionName); + } + + @Override + public void start(final FlowFilePartitioner partitionerUsed) { + } + + @Override + public void stop() { + } + + @Override + public void inheritQueueContents(final FlowFileQueueContents queueContents) { + priorityQueue.inheritQueueContents(queueContents); + } + + @Override + public String toString() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/protocol/LoadBalanceProtocolConstants.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/protocol/LoadBalanceProtocolConstants.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/protocol/LoadBalanceProtocolConstants.java new file mode 100644 index 0000000..5b02f13 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/protocol/LoadBalanceProtocolConstants.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.protocol; + +public class LoadBalanceProtocolConstants { + // Protocol negotiation constants + public static final int VERSION_ACCEPTED = 0x10; + public static final int REQEUST_DIFFERENT_VERSION = 0x11; + public static final int ABORT_PROTOCOL_NEGOTIATION = 0x12; + + // Transaction constants + public static final int CONFIRM_CHECKSUM = 0x21; + public static final int REJECT_CHECKSUM = 0x22; + public static final int COMPLETE_TRANSACTION = 0x23; + public static final int ABORT_TRANSACTION = 0x24; + public static final int CONFIRM_COMPLETE_TRANSACTION = 0x25; + + // FlowFile constants + public static final int MORE_FLOWFILES = 0x31; + public static final int NO_MORE_FLOWFILES = 0x32; + + // Backpressure / Space constants + public static final int CHECK_SPACE = 0x61; + public static final int SKIP_SPACE_CHECK = 0x62; + public static final int SPACE_AVAILABLE = 0x65; + public static final int QUEUE_FULL = 0x66; + + // data frame constants + public static final int NO_DATA_FRAME = 0x40; + public static final int DATA_FRAME_FOLLOWS = 0x42; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java new file mode 100644 index 0000000..43187b5 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.server; + +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.reporting.Severity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer { + private static final Logger logger = LoggerFactory.getLogger(ClusterLoadBalanceAuthorizer.class); + + private final ClusterCoordinator clusterCoordinator; + private final EventReporter eventReporter; + + public ClusterLoadBalanceAuthorizer(final ClusterCoordinator clusterCoordinator, final EventReporter eventReporter) { + this.clusterCoordinator = clusterCoordinator; + this.eventReporter = eventReporter; + } + + @Override + public void authorize(final Collection<String> clientIdentities) throws NotAuthorizedException { + if (clientIdentities == null) { + logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing"); + return; + } + + final Set<String> nodeIds = clusterCoordinator.getNodeIdentifiers().stream() + .map(NodeIdentifier::getApiAddress) + .collect(Collectors.toSet()); + + for (final String clientId : clientIdentities) { + if (nodeIds.contains(clientId)) { + logger.debug("Client ID '{}' is in the list of Nodes in the Cluster. Authorizing Client to Load Balance data", clientId); + return; + } + } + + final String message = String.format("Authorization failed for Client ID's %s to Load Balance data because none of the ID's are known Cluster Node Identifiers", + clientIdentities); + + logger.warn(message); + eventReporter.reportEvent(Severity.WARNING, "Load Balanced Connections", message); + throw new NotAuthorizedException("Client ID's " + clientIdentities + " are not authorized to Load Balance data"); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java new file mode 100644 index 0000000..93fc2d7 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.server; + +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.reporting.Severity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocket; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + + +public class ConnectionLoadBalanceServer { + private static final Logger logger = LoggerFactory.getLogger(ConnectionLoadBalanceServer.class); + + private final String hostname; + private final int port; + private final SSLContext sslContext; + private final ExecutorService threadPool; + private final LoadBalanceProtocol loadBalanceProtocol; + private final int connectionTimeoutMillis; + private final int numThreads; + private final EventReporter eventReporter; + + private volatile Set<CommunicateAction> communicationActions = Collections.emptySet(); + private final BlockingQueue<Socket> connectionQueue = new LinkedBlockingQueue<>(); + + private volatile AcceptConnection acceptConnection; + private volatile ServerSocket serverSocket; + private volatile boolean stopped = true; + + public ConnectionLoadBalanceServer(final String hostname, final int port, final SSLContext sslContext, final int numThreads, final LoadBalanceProtocol loadBalanceProtocol, + final EventReporter eventReporter, final int connectionTimeoutMillis) { + this.hostname = hostname; + this.port = port; + this.sslContext = sslContext; + this.loadBalanceProtocol = loadBalanceProtocol; + this.connectionTimeoutMillis = connectionTimeoutMillis; + this.numThreads = numThreads; + this.eventReporter = eventReporter; + + threadPool = new FlowEngine(numThreads, "Load Balance Server"); + } + + public void start() throws IOException { + if (!stopped) { + return; + } + + stopped = false; + if (serverSocket != null) { + return; + } + + try { + serverSocket = createServerSocket(); + } catch (final Exception e) { + throw new IOException("Could not begin listening for incoming connections in order to load balance data across the cluster. Please verify the values of the " + + "'nifi.cluster.load.balance.port' and 'nifi.cluster.load.balance.host' properties as well as the 'nifi.security.*' properties", e); + } + + final Set<CommunicateAction> actions = new HashSet<>(numThreads); + for (int i=0; i < numThreads; i++) { + final CommunicateAction action = new CommunicateAction(loadBalanceProtocol); + actions.add(action); + threadPool.submit(action); + } + + this.communicationActions = actions; + + acceptConnection = new AcceptConnection(serverSocket); + final Thread receiveConnectionThread = new Thread(acceptConnection); + receiveConnectionThread.setName("Receive Queue Load-Balancing Connections"); + receiveConnectionThread.start(); + } + + public int getPort() { + return serverSocket.getLocalPort(); + } + + public void stop() { + stopped = false; + threadPool.shutdown(); + + if (acceptConnection != null) { + acceptConnection.stop(); + } + + communicationActions.forEach(CommunicateAction::stop); + + Socket socket; + while ((socket = connectionQueue.poll()) != null) { + try { + socket.close(); + logger.info("{} Closed connection to {} on Server stop", this, socket.getRemoteSocketAddress()); + } catch (final IOException ioe) { + logger.warn("Failed to properly close socket to " + socket.getRemoteSocketAddress(), ioe); + } + } + } + + private ServerSocket createServerSocket() throws IOException { + final InetAddress inetAddress = hostname == null ? null : InetAddress.getByName(hostname); + + if (sslContext == null) { + return new ServerSocket(port, 50, InetAddress.getByName(hostname)); + } else { + final ServerSocket serverSocket = sslContext.getServerSocketFactory().createServerSocket(port, 50, inetAddress); + ((SSLServerSocket) serverSocket).setNeedClientAuth(true); + return serverSocket; + } + } + + + private class CommunicateAction implements Runnable { + private final LoadBalanceProtocol loadBalanceProtocol; + private volatile boolean stopped = false; + + public CommunicateAction(final LoadBalanceProtocol loadBalanceProtocol) { + this.loadBalanceProtocol = loadBalanceProtocol; + } + + public void stop() { + this.stopped = true; + } + + @Override + public void run() { + String peerDescription = "<Unknown Client>"; + + while (!stopped) { + Socket socket = null; + try { + socket = connectionQueue.poll(1, TimeUnit.SECONDS); + if (socket == null) { + continue; + } + + peerDescription = socket.getRemoteSocketAddress().toString(); + + if (socket.isClosed()) { + logger.debug("Connection to Peer {} is closed. Will not attempt to communicate over this Socket.", peerDescription); + continue; + } + + logger.debug("Receiving FlowFiles from Peer {}", peerDescription); + loadBalanceProtocol.receiveFlowFiles(socket); + + if (socket.isConnected()) { + logger.debug("Finished receiving FlowFiles from Peer {}. Will recycle connection.", peerDescription); + connectionQueue.offer(socket); + } else { + logger.debug("Finished receiving FlowFiles from Peer {}. Socket is no longer connected so will not recycle connection.", peerDescription); + } + } catch (final Exception e) { + if (socket != null) { + try { + socket.close(); + } catch (final IOException ioe) { + e.addSuppressed(ioe); + } + } + + logger.error("Failed to communicate with Peer {}", peerDescription, e); + eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", "Failed to receive FlowFiles for Load Balancing due to " + e); + } + } + + logger.info("Connection Load Balance Server shutdown. Will no longer handle incoming requests."); + } + } + + + private class AcceptConnection implements Runnable { + private final ServerSocket serverSocket; + private volatile boolean stopped = false; + + public AcceptConnection(final ServerSocket serverSocket) { + this.serverSocket = serverSocket; + } + + public void stop() { + stopped = true; + } + + @Override + public void run() { + try { + serverSocket.setSoTimeout(1000); + } catch (final Exception e) { + logger.error("Failed to set soTimeout on Server Socket for Load Balancing data across cluster", e); + } + + while (!stopped) { + try { + final Socket socket; + try { + socket = serverSocket.accept(); + } catch (final SocketTimeoutException ste) { + continue; + } + + socket.setSoTimeout(connectionTimeoutMillis); + connectionQueue.offer(socket); + } catch (final Exception e) { + logger.error("{} Failed to accept connection from other node in cluster", ConnectionLoadBalanceServer.this, e); + } + } + + try { + serverSocket.close(); + } catch (final Exception e) { + logger.warn("Failed to properly shutdown Server Socket for Load Balancing", e); + } + } + } + + @Override + public String toString() { + return "ConnectionLoadBalanceServer[hostname=" + hostname + ", port=" + port + ", secure=" + (sslContext != null) + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java new file mode 100644 index 0000000..3a716e2 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.server; + +import java.util.Collection; + +public interface LoadBalanceAuthorizer { + void authorize(Collection<String> clientIdentities) throws NotAuthorizedException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java new file mode 100644 index 0000000..5a74ebc --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.server; + +import java.io.IOException; +import java.net.Socket; + +public interface LoadBalanceProtocol { + + /** + * Receives FlowFiles from the peer attached to the socket + * + * @param socket the socket to read from and write to + * + * @throws TransactionAbortedException if the transaction was aborted + * @throws IOException if unable to communicate with the peer + */ + void receiveFlowFiles(Socket socket) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/NotAuthorizedException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/NotAuthorizedException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/NotAuthorizedException.java new file mode 100644 index 0000000..8aa1d53 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/NotAuthorizedException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.server; + +import java.io.IOException; + +public class NotAuthorizedException extends IOException { + public NotAuthorizedException(String message) { + super(message); + } +}
