This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3511ce3 NIFI-9548: When disabling RPG transmission, wait for the
ports to complete in a background thread instead of blocking the web thread.
Also moved the RPG initialization logic into flow controller instead of flow
service and added a delay in order to reduce likelihood of ConnectException
happening when pointing to nodes in the same cluster
3511ce3 is described below
commit 3511ce3d132f18fffd6e2a7aaf14962314d556ef
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jan 6 16:23:26 2022 -0500
NIFI-9548: When disabling RPG transmission, wait for the ports to complete
in a background thread instead of blocking the web thread. Also moved the RPG
initialization logic into flow controller instead of flow service and added a
delay in order to reduce likelihood of ConnectException happening when
pointing to nodes in the same cluster
Signed-off-by: Joe Gresock <[email protected]>
This closes #5641.
---
.../nifi/remote/StandardRemoteProcessGroup.java | 37 ++++++++++++++++++----
.../apache/nifi/reporting/AbstractEventAccess.java | 2 +-
.../org/apache/nifi/groups/RemoteProcessGroup.java | 12 +++++--
.../org/apache/nifi/controller/FlowController.java | 2 ++
.../nifi/controller/StandardFlowService.java | 10 ++++--
.../serialization/VersionedFlowSynchronizer.java | 3 --
.../nifi/remote/StandardRemoteGroupPort.java | 18 ++++++++---
.../org/apache/nifi/web/api/dto/DtoFactory.java | 2 +-
.../dao/impl/StandardRemoteProcessGroupDAO.java | 4 +--
9 files changed, 66 insertions(+), 24 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 4f031ab..3c0954d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -69,6 +69,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -108,6 +109,7 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
private final AtomicReference<String> comments = new AtomicReference<>();
private final AtomicReference<ProcessGroup> processGroup;
private final AtomicBoolean transmitting = new AtomicBoolean(false);
+ private final AtomicBoolean configuredToTransmit = new
AtomicBoolean(false);
private final AtomicReference<String> versionedComponentId = new
AtomicReference<>();
private final SSLContext sslContext;
@@ -183,7 +185,7 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
}
initialized = true;
- backgroundThreadExecutor.submit(() -> {
+ backgroundThreadExecutor.schedule(() -> {
try {
refreshFlowContents();
} catch (final Exception e) {
@@ -194,7 +196,7 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
logger.warn("Unable to communicate with remote instance
{}", this, e);
}
}
- });
+ }, 3, TimeUnit.SECONDS);
final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations,
0L, 60L, TimeUnit.SECONDS);
@@ -1043,6 +1045,11 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
}
@Override
+ public boolean isConfiguredToTransmit() {
+ return configuredToTransmit.get();
+ }
+
+ @Override
public void startTransmitting() {
writeLock.lock();
try {
@@ -1063,6 +1070,7 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
}
transmitting.set(true);
+ configuredToTransmit.set(true);
} finally {
writeLock.unlock();
}
@@ -1081,13 +1089,14 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
scheduler.startPort(port);
transmitting.set(true);
+ configuredToTransmit.set(true);
} finally {
writeLock.unlock();
}
}
@Override
- public void stopTransmitting() {
+ public Future<?> stopTransmitting() {
writeLock.lock();
try {
verifyCanStopTransmitting();
@@ -1100,12 +1109,24 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
scheduler.stopPort(port);
}
- // Wait for the ports to stop
+ configuredToTransmit.set(false);
+
+ return scheduler.submitFrameworkTask(this::waitForPortShutdown);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void waitForPortShutdown() {
+ // Wait for the ports to stop
+ try {
for (final RemoteGroupPort port : getInputPorts()) {
while (port.isRunning()) {
try {
Thread.sleep(50L);
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
}
}
}
@@ -1115,13 +1136,13 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
try {
Thread.sleep(50L);
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
}
}
}
-
- transmitting.set(false);
} finally {
- writeLock.unlock();
+ transmitting.set(false);
}
}
@@ -1142,6 +1163,7 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
try {
Thread.sleep(50L);
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
@@ -1163,6 +1185,7 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
}
}
+ configuredToTransmit.set(stillTransmitting);
transmitting.set(stillTransmitting);
} finally {
writeLock.unlock();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
index e98a3f7..51d62d7 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
@@ -550,7 +550,7 @@ public abstract class AbstractEventAccess implements
EventAccess {
}
status.setId(remoteGroup.getIdentifier());
- status.setTransmissionStatus(remoteGroup.isTransmitting() ?
TransmissionStatus.Transmitting : TransmissionStatus.NotTransmitting);
+ status.setTransmissionStatus(remoteGroup.isConfiguredToTransmit() ?
TransmissionStatus.Transmitting : TransmissionStatus.NotTransmitting);
status.setActiveThreadCount(activeThreadCount);
status.setReceivedContentSize(receivedContentSize);
status.setReceivedCount(receivedCount);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index f9c1021..87676aa 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -30,6 +30,7 @@ import java.net.InetAddress;
import java.util.Collection;
import java.util.Date;
import java.util.Set;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public interface RemoteProcessGroup extends ComponentAuthorizable,
Positionable, VersionedComponent {
@@ -107,12 +108,17 @@ public interface RemoteProcessGroup extends
ComponentAuthorizable, Positionable,
String getCommunicationsTimeout();
/**
- * @return Indicates whether or not the RemoteProcessGroup is currently
scheduled to
- * transmit data
+ * @return Indicates whether or not the RemoteProcessGroup is currently
configured to transmit data OR if there are any threads that are currently
+ * active due to previously being scheduled to transmit that have not
completed yet.
*/
boolean isTransmitting();
/**
+ * @return <code>true</code> if the RPG is configured to transmit,
<code>false</code> otherwise
+ */
+ boolean isConfiguredToTransmit();
+
+ /**
* Initiates communications between this instance and the remote instance.
*/
void startTransmitting();
@@ -121,7 +127,7 @@ public interface RemoteProcessGroup extends
ComponentAuthorizable, Positionable,
* Immediately terminates communications between this instance and the
* remote instance.
*/
- void stopTransmitting();
+ Future<?> stopTransmitting();
/**
* Initiates communications between this instance and the remote instance
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index a2b03dc..99ba14e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1114,6 +1114,8 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
startRemoteGroupPortsAfterInitialization.clear();
}
+
flowManager.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
+
for (final Connection connection :
flowManager.findAllConnections()) {
connection.getFlowFileQueue().startLoadBalancing();
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index f1d7d97..5abbd0d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -708,8 +708,14 @@ public class StandardFlowService implements FlowService,
ProtocolHandler {
// request to stop all remote process groups
flowManager.getRootGroup().findAllRemoteProcessGroups()
- .stream().filter(rpg -> rpg.isTransmitting())
- .forEach(RemoteProcessGroup::stopTransmitting);
+ .stream().filter(RemoteProcessGroup::isTransmitting)
+ .forEach(rpg -> {
+ try {
+
rpg.stopTransmitting().get(rpg.getCommunicationsTimeout(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ logger.warn("Encountered failure while waiting for
{} to shutdown", rpg, e);
+ }
+ });
// offload all queues on node
final Set<Connection> connections =
flowManager.findAllConnections();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 3ef7af4..80a8e9e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -60,7 +60,6 @@ import org.apache.nifi.groups.ComponentIdGenerator;
import org.apache.nifi.groups.ComponentScheduler;
import org.apache.nifi.groups.GroupSynchronizationOptions;
import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
@@ -349,8 +348,6 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
// Inherit templates, now that all necessary Process Groups
have been created
inheritTemplates(controller, versionedFlow);
-
-
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
}
inheritSnippets(controller, proposedFlow);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 227a2d0..e30df55 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -39,6 +39,7 @@ import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.protocol.DataPacket;
@@ -273,14 +274,21 @@ public class StandardRemoteGroupPort extends
RemoteGroupPort {
session.commitAsync();
} catch (final Throwable t) {
final String message = String.format("%s failed to communicate
with remote NiFi instance due to %s", this, t.toString());
- logger.error("{} failed to communicate with remote NiFi instance
due to {}", this, t.toString());
- if (logger.isDebugEnabled()) {
- logger.error("", t);
+
+ // If Exception is a TransmissionDisabledException, it's because
the user explicitly terminated the connection in the middle.
+ // No need to log errors for this, just debug log and move on.
Otherwise, log the error.
+ if (t instanceof TransmissionDisabledException) {
+ logger.debug(message, t);
+ } else {
+ logger.error("{} failed to communicate with remote NiFi
instance due to {}", this, t.toString());
+ if (logger.isDebugEnabled()) {
+ logger.error("", t);
+ }
+
+ remoteGroup.getEventReporter().reportEvent(Severity.ERROR,
CATEGORY, message);
}
- remoteGroup.getEventReporter().reportEvent(Severity.ERROR,
CATEGORY, message);
transaction.error();
-
throw new ProcessException(t);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index ae2fd44..bc576ec 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1870,7 +1870,7 @@ public final class DtoFactory {
dto.setName(group.getName());
dto.setPosition(createPositionDto(group.getPosition()));
dto.setComments(group.getComments());
- dto.setTransmitting(group.isTransmitting());
+ dto.setTransmitting(group.isConfiguredToTransmit());
dto.setCommunicationsTimeout(group.getCommunicationsTimeout());
dto.setYieldDuration(group.getYieldDuration());
dto.setParentGroupId(group.getProcessGroup().getIdentifier());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index 274b5e4..f827211 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -443,9 +443,9 @@ public class StandardRemoteProcessGroupDAO extends
ComponentDAO implements Remot
final Boolean isTransmitting = remoteProcessGroupDTO.isTransmitting();
if (isNotNull(isTransmitting)) {
// start or stop as necessary
- if (!remoteProcessGroup.isTransmitting() && isTransmitting) {
+ if (!remoteProcessGroup.isConfiguredToTransmit() &&
isTransmitting) {
remoteProcessGroup.startTransmitting();
- } else if (remoteProcessGroup.isTransmitting() && !isTransmitting)
{
+ } else if (remoteProcessGroup.isConfiguredToTransmit() &&
!isTransmitting) {
remoteProcessGroup.stopTransmitting();
}
}