Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2947#discussion_r220440614
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
---
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.queue.FlowFileQueueContents;
+import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
+import
org.apache.nifi.controller.queue.StandardRemoteQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.SwappablePriorityQueue;
+import
org.apache.nifi.controller.queue.clustered.TransferFailureDestination;
+import
org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
+import
org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback;
+import
org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
+import org.apache.nifi.controller.repository.ContentNotFoundException;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.StandardRepositoryRecord;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * A Queue Partition that is responsible for transferring FlowFiles to
another node in the cluster
+ */
+public class RemoteQueuePartition implements QueuePartition {
+ private static final Logger logger =
LoggerFactory.getLogger(RemoteQueuePartition.class);
+
+ private final NodeIdentifier nodeIdentifier;
+ private final SwappablePriorityQueue priorityQueue;
+ private final LoadBalancedFlowFileQueue flowFileQueue;
+ private final TransferFailureDestination failureDestination;
+
+ private final FlowFileRepository flowFileRepo;
+ private final ProvenanceEventRepository provRepo;
+ private final ContentRepository contentRepo;
+ private final AsyncLoadBalanceClientRegistry clientRegistry;
+
+ private boolean running = false;
+ private final String description;
+
+ public RemoteQueuePartition(final NodeIdentifier nodeId, final
SwappablePriorityQueue priorityQueue, final TransferFailureDestination
failureDestination,
+ final FlowFileRepository flowFileRepo,
final ProvenanceEventRepository provRepo, final ContentRepository
contentRepository,
+ final AsyncLoadBalanceClientRegistry
clientRegistry, final LoadBalancedFlowFileQueue flowFileQueue) {
+
+ this.nodeIdentifier = nodeId;
+ this.priorityQueue = priorityQueue;
+ this.flowFileQueue = flowFileQueue;
+ this.failureDestination = failureDestination;
+ this.flowFileRepo = flowFileRepo;
+ this.provRepo = provRepo;
+ this.contentRepo = contentRepository;
+ this.clientRegistry = clientRegistry;
+ this.description = "RemoteQueuePartition[queueId=" +
flowFileQueue.getIdentifier() + ", nodeId=" + nodeIdentifier + "]";
+ }
+
+ @Override
+ public QueueSize size() {
+ return priorityQueue.size();
+ }
+
+ @Override
+ public String getSwapPartitionName() {
+ return nodeIdentifier.getId();
+ }
+
+ @Override
+ public Optional<NodeIdentifier> getNodeIdentifier() {
+ return Optional.ofNullable(nodeIdentifier);
+ }
+
+ @Override
+ public void put(final FlowFileRecord flowFile) {
+ priorityQueue.put(flowFile);
+ }
+
+ @Override
+ public void putAll(final Collection<FlowFileRecord> flowFiles) {
+ priorityQueue.putAll(flowFiles);
+ }
+
+ @Override
+ public void dropFlowFiles(final DropFlowFileRequest dropRequest, final
String requestor) {
+ priorityQueue.dropFlowFiles(dropRequest, requestor);
+ }
+
+ @Override
+ public SwapSummary recoverSwappedFlowFiles() {
+ return priorityQueue.recoverSwappedFlowFiles();
+ }
+
+ @Override
+ public FlowFileQueueContents packageForRebalance(String
newPartitionName) {
+ return priorityQueue.packageForRebalance(newPartitionName);
+ }
+
+ @Override
+ public void setPriorities(final List<FlowFilePrioritizer>
newPriorities) {
+ priorityQueue.setPriorities(newPriorities);
+ }
+
+ private FlowFileRecord getFlowFile() {
+ final Set<FlowFileRecord> expired = new HashSet<>();
+ final FlowFileRecord flowFile = priorityQueue.poll(expired,
flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS));
+ flowFileQueue.handleExpiredRecords(expired);
+ return flowFile;
+ }
+
+ @Override
+ public synchronized void start(final FlowFilePartitioner partitioner) {
+ if (running) {
+ return;
+ }
+
+ final TransactionFailureCallback failureCallback = new
TransactionFailureCallback() {
+ @Override
+ public void onTransactionFailed(final List<FlowFileRecord>
flowFiles, final Exception cause, final TransactionPhase phase) {
+ if (cause instanceof ContentNotFoundException) {
+ // Handle ContentNotFound by creating a
RepositoryRecord for the FlowFile and marking as aborted, then updating the
+ // FlowFiles and Provenance Repositories accordingly.
This follows the same pattern as StandardProcessSession so that
+ // we have a consistent way of handling this case.
+ final Optional<FlowFileRecord> optionalFlowFile =
((ContentNotFoundException) cause).getFlowFile();
+ if (optionalFlowFile.isPresent()) {
+ final List<FlowFileRecord> successfulFlowFiles =
new ArrayList<>(flowFiles);
+
+ final FlowFileRecord flowFile =
optionalFlowFile.get();
+ successfulFlowFiles.remove(flowFile);
+
+ final StandardRepositoryRecord repoRecord = new
StandardRepositoryRecord(flowFileQueue, flowFile);
+ repoRecord.markForAbort();
+
+ updateRepositories(Collections.emptyList(),
Collections.singleton(repoRecord));
+
+ // If unable to even connect to the node, go ahead
and transfer all FlowFiles for this queue to the failure destination.
+ // Otherwise, transfer just those FlowFiles that
we failed to send.
+ if (phase == TransactionPhase.CONNECTING) {
+
failureDestination.putAll(priorityQueue::packageForRebalance, partitioner);
+ } else {
+ failureDestination.putAll(successfulFlowFiles,
partitioner);
+ }
+
+ return;
+ }
+ }
+
+ // If unable to even connect to the node, go ahead and
transfer all FlowFiles for this queue to the failure destination.
+ // Otherwise, transfer just those FlowFiles that we failed
to send.
+ if (phase == TransactionPhase.CONNECTING) {
+
failureDestination.putAll(priorityQueue::packageForRebalance, partitioner);
+ } else {
+ failureDestination.putAll(flowFiles, partitioner);
+ }
+ }
+ };
+
+ final TransactionCompleteCallback successCallback = new
TransactionCompleteCallback() {
+ @Override
+ public void onTransactionComplete(final List<FlowFileRecord>
flowFilesSent) {
+ // We've now completed the transaction. We must now update
the repositories and "keep the books", acknowledging the FlowFiles
+ // with the queue so that its size remains accurate.
+ updateRepositories(flowFilesSent, Collections.emptyList());
+ priorityQueue.acknowledge(flowFilesSent);
--- End diff --
This `priorityQueue` emptiness is used to determine if there's no more
FlowFiles at the `priorityQueue::isEmpty` below. After I encountered transfer
failure, this priorityQueue never become empty as acknowledge method is only
called here at `onTransactionComplete`. That resulted logging the following
message continuously which means empty FlowFile lists are being sent
continuously where not necessary, because NiFi thinks there're more FlowFiles
to load-balance. `priorityQueue::isEmpty` returns false:
```
o.a.n.c.q.c.SocketLoadBalancedFlowFileQueue Received the following
FlowFiles from Peer: []. Will accept FlowFiles to the local partition
```
Shouldn't we call acknowledge from `failureCallback`, too?
---