[
https://issues.apache.org/jira/browse/NIFI-5585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622637#comment-16622637
]
ASF GitHub Bot commented on NIFI-5585:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3010#discussion_r219297604
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * PReturns remote partitions when queried for a partition; never returns
the {@link LocalQueuePartition}.
+ */
+public class NonLocalPartitionPartitioner implements FlowFilePartitioner {
+ private final AtomicLong counter = new AtomicLong(0L);
+
+ @Override
+ public QueuePartition getPartition(final FlowFileRecord flowFile,
final QueuePartition[] partitions, final QueuePartition localPartition) {
+ QueuePartition remotePartition = null;
+ for (int i = 0, numPartitions = partitions.length; i <
numPartitions; i++) {
+ final long count = counter.getAndIncrement();
--- End diff --
I think the logic here is just a bit off, because if we have say 5
partitions but many threads, it could be the case that the first iteration gets
count = 3 and that's the local partition. The next iteration could get count =
8 so index = 3, which is the local partition. The next iteration gets count =
13 so index = 3, then count = 18, then count = 23. At this point we'd have
pointed to the local partition 5 times in a row and thrown
IllegalStateException.
Instead, I think we should do something like:
{code}
final long startIndex = counter.getAndIncrement();
for (int i=0; i < partitions.length; i++) {
final int index = (int) (startIndex + i) % partitions.length;
final QueuePartition partition = partitions[index];
...
}
{code}
This way, we still get the round-robin-ing effect but ensure that we try
all partitions before throwing an IllegalStateException.
> Decommision Nodes from Cluster
> ------------------------------
>
> Key: NIFI-5585
> URL: https://issues.apache.org/jira/browse/NIFI-5585
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Affects Versions: 1.7.1
> Reporter: Jeff Storck
> Assignee: Jeff Storck
> Priority: Major
>
> Allow a node in the cluster to be decommissioned, rebalancing flowfiles on
> the node to be decommissioned to the other active nodes. This work depends
> on NIFI-5516.
> Similar to the client sending PUT request a DISCONNECTING message to
> cluster/nodes/\{id}, a DECOMMISSIONING message can be sent as a PUT request
> to the same URI to initiate a DECOMMISSION for a DISCONNECTED node. The
> DECOMMISSIONING request will be idempotent.
> The steps to decommission a node and remove it from the cluster are:
> # Send request to disconnect the node
> # Once disconnect completes, send request to decommission the node.
> # Once decommission completes, send request to delete node.
> When an error occurs and the node can not complete decommissioning, the user
> can:
> # Send request to delete the node from the cluster
> # Diagnose why the node had issues with the decommission (out of memory, no
> network connection, etc) and address the issue
> # Restart NiFi on the node to so that it will reconnect to the cluster
> # Go through the steps to decommission and remove a node
> Toolkit CLI commands for retrieving a list of nodes and
> disconnecting/decommissioning/deleting nodes have been added.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)