Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2947#discussion_r220454819
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
 ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.controller.queue.clustered.client.async.nio;
    +
    +import org.apache.nifi.cluster.coordination.ClusterCoordinator;
    +import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
    +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.events.EventReporter;
    +import org.apache.nifi.reporting.Severity;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class NioAsyncLoadBalanceClientTask implements Runnable {
    +    private static final Logger logger = 
LoggerFactory.getLogger(NioAsyncLoadBalanceClientTask.class);
    +    private static final String EVENT_CATEGORY = "Load-Balanced 
Connection";
    +
    +    private final NioAsyncLoadBalanceClientRegistry clientRegistry;
    +    private final ClusterCoordinator clusterCoordinator;
    +    private final EventReporter eventReporter;
    +    private volatile boolean running = true;
    +
    +    public NioAsyncLoadBalanceClientTask(final 
NioAsyncLoadBalanceClientRegistry clientRegistry, final ClusterCoordinator 
clusterCoordinator, final EventReporter eventReporter) {
    +        this.clientRegistry = clientRegistry;
    +        this.clusterCoordinator = clusterCoordinator;
    +        this.eventReporter = eventReporter;
    +    }
    +
    +    @Override
    +    public void run() {
    +        while (running) {
    +            try {
    +                boolean success = false;
    +                for (final NioAsyncLoadBalanceClient client : 
clientRegistry.getAllClients()) {
    +                    if (!client.isRunning()) {
    +                        logger.trace("Client {} is not running so will not 
communicate with it", client);
    +                        continue;
    +                    }
    +
    +                    if (client.isPenalized()) {
    +                        logger.trace("Client {} is penalized so will not 
communicate with it", client);
    +                        continue;
    +                    }
    +
    +                    final NodeIdentifier clientNodeId = 
client.getNodeIdentifier();
    +                    final NodeConnectionStatus connectionStatus = 
clusterCoordinator.getConnectionStatus(clientNodeId);
    +                    final NodeConnectionState connectionState = 
connectionStatus.getState();
    +                    if (connectionState != NodeConnectionState.CONNECTED) {
    --- End diff --
    
    Do we want to rebalance queued FlowFiles here if strategy allows to do so? 
FlowFilePartitioner.isRebalanceOnFailure is only evaluated when there is a 
communication activity. After a node goes down and a remaining node updates the 
connection status to DISCONNECTED, this branch stops further processing. Thus, 
FlowFiles will be kept at the connection even though the strategy allows 
rebalance.
    
    Scenario, with 2 nodes cluster, using RoundRobin, FlowFiles being created 
continuously and distributed evenly. Then one node goes down. 50% of generated 
FlowFiles will stay in the connection.
    
    With CorrelationAttributePartitioner, this is the expected behavior. But I 
expect FlowFiles to be processed at the remaining node in this case with 
RoundRobin partitioner. 


---

Reply via email to