This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3511ce3  NIFI-9548: When disabling RPG transmission, wait for the 
ports to complete in a background thread instead of blocking the web thread. 
Also moved the RPG initialization logic into flow controller instead of flow 
service and added a delay in order to reduce likelihood of ConnectException  
happening when pointing to nodes in the same cluster
3511ce3 is described below

commit 3511ce3d132f18fffd6e2a7aaf14962314d556ef
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jan 6 16:23:26 2022 -0500

    NIFI-9548: When disabling RPG transmission, wait for the ports to complete 
in a background thread instead of blocking the web thread. Also moved the RPG 
initialization logic into flow controller instead of flow service and added a 
delay in order to reduce likelihood of ConnectException  happening when 
pointing to nodes in the same cluster
    
    Signed-off-by: Joe Gresock <[email protected]>
    
    This closes #5641.
---
 .../nifi/remote/StandardRemoteProcessGroup.java    | 37 ++++++++++++++++++----
 .../apache/nifi/reporting/AbstractEventAccess.java |  2 +-
 .../org/apache/nifi/groups/RemoteProcessGroup.java | 12 +++++--
 .../org/apache/nifi/controller/FlowController.java |  2 ++
 .../nifi/controller/StandardFlowService.java       | 10 ++++--
 .../serialization/VersionedFlowSynchronizer.java   |  3 --
 .../nifi/remote/StandardRemoteGroupPort.java       | 18 ++++++++---
 .../org/apache/nifi/web/api/dto/DtoFactory.java    |  2 +-
 .../dao/impl/StandardRemoteProcessGroupDAO.java    |  4 +--
 9 files changed, 66 insertions(+), 24 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 4f031ab..3c0954d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -69,6 +69,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -108,6 +109,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
     private final AtomicReference<String> comments = new AtomicReference<>();
     private final AtomicReference<ProcessGroup> processGroup;
     private final AtomicBoolean transmitting = new AtomicBoolean(false);
+    private final AtomicBoolean configuredToTransmit = new 
AtomicBoolean(false);
     private final AtomicReference<String> versionedComponentId = new 
AtomicReference<>();
     private final SSLContext sslContext;
 
@@ -183,7 +185,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
         }
 
         initialized = true;
-        backgroundThreadExecutor.submit(() -> {
+        backgroundThreadExecutor.schedule(() -> {
             try {
                 refreshFlowContents();
             } catch (final Exception e) {
@@ -194,7 +196,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                     logger.warn("Unable to communicate with remote instance 
{}", this, e);
                 }
             }
-        });
+        }, 3, TimeUnit.SECONDS);
 
         final Runnable checkAuthorizations = new InitializationTask();
         backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 
0L, 60L, TimeUnit.SECONDS);
@@ -1043,6 +1045,11 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
     }
 
     @Override
+    public boolean isConfiguredToTransmit() {
+        return configuredToTransmit.get();
+    }
+
+    @Override
     public void startTransmitting() {
         writeLock.lock();
         try {
@@ -1063,6 +1070,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             }
 
             transmitting.set(true);
+            configuredToTransmit.set(true);
         } finally {
             writeLock.unlock();
         }
@@ -1081,13 +1089,14 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             scheduler.startPort(port);
 
             transmitting.set(true);
+            configuredToTransmit.set(true);
         } finally {
             writeLock.unlock();
         }
     }
 
     @Override
-    public void stopTransmitting() {
+    public Future<?> stopTransmitting() {
         writeLock.lock();
         try {
             verifyCanStopTransmitting();
@@ -1100,12 +1109,24 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                 scheduler.stopPort(port);
             }
 
-            // Wait for the ports to stop
+            configuredToTransmit.set(false);
+
+            return scheduler.submitFrameworkTask(this::waitForPortShutdown);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    private void waitForPortShutdown() {
+        // Wait for the ports to stop
+        try {
             for (final RemoteGroupPort port : getInputPorts()) {
                 while (port.isRunning()) {
                     try {
                         Thread.sleep(50L);
                     } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        return;
                     }
                 }
             }
@@ -1115,13 +1136,13 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                     try {
                         Thread.sleep(50L);
                     } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        return;
                     }
                 }
             }
-
-            transmitting.set(false);
         } finally {
-            writeLock.unlock();
+            transmitting.set(false);
         }
     }
 
@@ -1142,6 +1163,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                 try {
                     Thread.sleep(50L);
                 } catch (final InterruptedException e) {
+                    Thread.currentThread().interrupt();
                 }
             }
 
@@ -1163,6 +1185,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                 }
             }
 
+            configuredToTransmit.set(stillTransmitting);
             transmitting.set(stillTransmitting);
         } finally {
             writeLock.unlock();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
index e98a3f7..51d62d7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
@@ -550,7 +550,7 @@ public abstract class AbstractEventAccess implements 
EventAccess {
         }
 
         status.setId(remoteGroup.getIdentifier());
-        status.setTransmissionStatus(remoteGroup.isTransmitting() ? 
TransmissionStatus.Transmitting : TransmissionStatus.NotTransmitting);
+        status.setTransmissionStatus(remoteGroup.isConfiguredToTransmit() ? 
TransmissionStatus.Transmitting : TransmissionStatus.NotTransmitting);
         status.setActiveThreadCount(activeThreadCount);
         status.setReceivedContentSize(receivedContentSize);
         status.setReceivedCount(receivedCount);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index f9c1021..87676aa 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -30,6 +30,7 @@ import java.net.InetAddress;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 public interface RemoteProcessGroup extends ComponentAuthorizable, 
Positionable, VersionedComponent {
@@ -107,12 +108,17 @@ public interface RemoteProcessGroup extends 
ComponentAuthorizable, Positionable,
     String getCommunicationsTimeout();
 
     /**
-     * @return Indicates whether or not the RemoteProcessGroup is currently 
scheduled to
-     * transmit data
+     * @return Indicates whether or not the RemoteProcessGroup is currently 
configured to transmit data OR if there are any threads that are currently
+     * active due to previously being scheduled to transmit that have not 
completed yet.
      */
     boolean isTransmitting();
 
     /**
+     * @return <code>true</code> if the RPG is configured to transmit, 
<code>false</code> otherwise
+     */
+    boolean isConfiguredToTransmit();
+
+    /**
      * Initiates communications between this instance and the remote instance.
      */
     void startTransmitting();
@@ -121,7 +127,7 @@ public interface RemoteProcessGroup extends 
ComponentAuthorizable, Positionable,
      * Immediately terminates communications between this instance and the
      * remote instance.
      */
-    void stopTransmitting();
+    Future<?> stopTransmitting();
 
     /**
      * Initiates communications between this instance and the remote instance
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index a2b03dc..99ba14e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1114,6 +1114,8 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
                 startRemoteGroupPortsAfterInitialization.clear();
             }
 
+            
flowManager.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
+
             for (final Connection connection : 
flowManager.findAllConnections()) {
                 connection.getFlowFileQueue().startLoadBalancing();
             }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index f1d7d97..5abbd0d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -708,8 +708,14 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
             // request to stop all remote process groups
             flowManager.getRootGroup().findAllRemoteProcessGroups()
-                    .stream().filter(rpg -> rpg.isTransmitting())
-                    .forEach(RemoteProcessGroup::stopTransmitting);
+                    .stream().filter(RemoteProcessGroup::isTransmitting)
+                    .forEach(rpg -> {
+                        try {
+                            
rpg.stopTransmitting().get(rpg.getCommunicationsTimeout(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS);
+                        } catch (final Exception e) {
+                            logger.warn("Encountered failure while waiting for 
{} to shutdown", rpg, e);
+                        }
+                    });
 
             // offload all queues on node
             final Set<Connection> connections = 
flowManager.findAllConnections();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 3ef7af4..80a8e9e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -60,7 +60,6 @@ import org.apache.nifi.groups.ComponentIdGenerator;
 import org.apache.nifi.groups.ComponentScheduler;
 import org.apache.nifi.groups.GroupSynchronizationOptions;
 import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.parameter.Parameter;
 import org.apache.nifi.parameter.ParameterContext;
@@ -349,8 +348,6 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
 
                 // Inherit templates, now that all necessary Process Groups 
have been created
                 inheritTemplates(controller, versionedFlow);
-
-                
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
             }
 
             inheritSnippets(controller, proposedFlow);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 227a2d0..e30df55 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -39,6 +39,7 @@ import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
 import org.apache.nifi.remote.exception.UnknownPortException;
 import org.apache.nifi.remote.exception.UnreachableClusterException;
 import org.apache.nifi.remote.protocol.DataPacket;
@@ -273,14 +274,21 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             session.commitAsync();
         } catch (final Throwable t) {
             final String message = String.format("%s failed to communicate 
with remote NiFi instance due to %s", this, t.toString());
-            logger.error("{} failed to communicate with remote NiFi instance 
due to {}", this, t.toString());
-            if (logger.isDebugEnabled()) {
-                logger.error("", t);
+
+            // If Exception is a TransmissionDisabledException, it's because 
the user explicitly terminated the connection in the middle.
+            // No need to log errors for this, just debug log and move on. 
Otherwise, log the error.
+            if (t instanceof TransmissionDisabledException) {
+                logger.debug(message, t);
+            } else {
+                logger.error("{} failed to communicate with remote NiFi 
instance due to {}", this, t.toString());
+                if (logger.isDebugEnabled()) {
+                    logger.error("", t);
+                }
+
+                remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             }
 
-            remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             transaction.error();
-
             throw new ProcessException(t);
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index ae2fd44..bc576ec 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1870,7 +1870,7 @@ public final class DtoFactory {
         dto.setName(group.getName());
         dto.setPosition(createPositionDto(group.getPosition()));
         dto.setComments(group.getComments());
-        dto.setTransmitting(group.isTransmitting());
+        dto.setTransmitting(group.isConfiguredToTransmit());
         dto.setCommunicationsTimeout(group.getCommunicationsTimeout());
         dto.setYieldDuration(group.getYieldDuration());
         dto.setParentGroupId(group.getProcessGroup().getIdentifier());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index 274b5e4..f827211 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -443,9 +443,9 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
         final Boolean isTransmitting = remoteProcessGroupDTO.isTransmitting();
         if (isNotNull(isTransmitting)) {
             // start or stop as necessary
-            if (!remoteProcessGroup.isTransmitting() && isTransmitting) {
+            if (!remoteProcessGroup.isConfiguredToTransmit() && 
isTransmitting) {
                 remoteProcessGroup.startTransmitting();
-            } else if (remoteProcessGroup.isTransmitting() && !isTransmitting) 
{
+            } else if (remoteProcessGroup.isConfiguredToTransmit() && 
!isTransmitting) {
                 remoteProcessGroup.stopTransmitting();
             }
         }

Reply via email to