This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi-api.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 6baaaa6 NIFI-15514: Bug fix - ensure that when AbstractConnector is
started, it includes all components, not just processors and controller
services; code cleanup (#57)
6baaaa6 is described below
commit 6baaaa6e89f790946a90be9972299f23e2a007fb
Author: Mark Payne <[email protected]>
AuthorDate: Tue Feb 3 10:22:32 2026 -0500
NIFI-15514: Bug fix - ensure that when AbstractConnector is started, it
includes all components, not just processors and controller services; code
cleanup (#57)
---
.../components/connector/AbstractConnector.java | 161 +++++++++++----------
1 file changed, 81 insertions(+), 80 deletions(-)
diff --git
a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
index d837f7b..7689858 100644
--- a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
+++ b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
@@ -87,14 +87,7 @@ public abstract class AbstractConnector implements Connector
{
@Override
public void start(final FlowContext context) throws FlowUpdateException {
final ProcessGroupLifecycle lifecycle =
context.getRootGroup().getLifecycle();
-
- try {
-
lifecycle.enableControllerServices(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
- } catch (final Exception e) {
- throw new FlowUpdateException("Failed to enable Controller
Services", e);
- }
-
- lifecycle.startProcessors();
+
lifecycle.start(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY);
}
@Override
@@ -129,7 +122,7 @@ public abstract class AbstractConnector implements
Connector {
}
});
- return stopProcessorsFuture.thenRun(() -> {
+ return stopProcessorsFuture.thenRunAsync(() -> {
try {
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
} catch (final Exception e) {
@@ -171,32 +164,8 @@ public abstract class AbstractConnector implements
Connector {
final CompletableFuture<Void> stopProcessorsFuture =
stopSourceProcessors(flowContext);
- final CompletableFuture<Void> startNonSourceFuture =
stopProcessorsFuture.thenRun(() -> {
- if (result.isDone()) {
- return;
- }
-
- final CompletableFuture<Void> enableServices =
flowContext.getRootGroup().getLifecycle().enableControllerServices(
-
ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
- ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
-
- try {
- // Wait for all referenced services to be enabled.
- enableServices.get();
-
- if (!result.isDone()) {
- getLogger().info("Starting all non-source processors to
facilitate drainage of FlowFiles");
- startNonSourceProcessors(flowContext).get();
- }
- } catch (final Exception e) {
- try {
-
flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
- } catch (final Exception e1) {
- e.addSuppressed(e1);
- }
-
- result.completeExceptionally(new RuntimeException("Failed to
start non-source processors while draining FlowFiles", e.getCause()));
- }
+ final CompletableFuture<Void> startNonSourceFuture =
stopProcessorsFuture.thenRunAsync(() -> {
+ startNonSourceProcessors(result, flowContext);
}).exceptionally(throwable -> {
if (!result.isDone()) {
result.completeExceptionally(new RuntimeException("Failed to
stop source processors while draining FlowFiles", throwable));
@@ -205,68 +174,100 @@ public abstract class AbstractConnector implements
Connector {
});
startNonSourceFuture.thenRunAsync(() -> {
- try {
- ensureDrainageUnblocked();
- } catch (final Exception e) {
- getLogger().warn("Failed to ensure drainage is unblocked when
draining FlowFiles", e);
+ completeDrain(result, flowContext, initialQueueSize);
+ }).exceptionally(throwable -> {
+ if (!result.isDone()) {
+ result.completeExceptionally(new RuntimeException("Failed
while draining FlowFiles", throwable));
}
+ return null;
+ });
- Exception failureReason = null;
- int iterations = 0;
- while (!isGroupDrained(flowContext.getRootGroup())) {
- if (result.isDone()) {
- getLogger().info("Drainage has been cancelled; will no
longer wait for FlowFiles to drain");
- break;
- }
+ return result;
+ }
- // Log the current queue size every 10 seconds (20 iterations
of 500ms) so that it's clear
- // whether or not progress is being made.
- if (iterations++ % 20 == 0) {
- final QueueSize queueSize =
flowContext.getRootGroup().getQueueSize();
- getLogger().info("Waiting for {} FlowFiles ({} bytes) to
drain",
- queueSize.getObjectCount(),
NumberFormat.getNumberInstance().format(queueSize.getByteCount()));
- }
+ private void completeDrain(final CompletableFuture<Void> result, final
FlowContext flowContext, final QueueSize initialQueueSize) {
+ try {
+ ensureDrainageUnblocked();
+ } catch (final Exception e) {
+ getLogger().warn("Failed to ensure drainage is unblocked when
draining FlowFiles", e);
+ }
- try {
- Thread.sleep(500);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- failureReason = e;
- break;
- }
+ Exception failureReason = null;
+ int iterations = 0;
+ while (!isGroupDrained(flowContext.getRootGroup())) {
+ if (result.isDone()) {
+ getLogger().info("Drainage has been cancelled; will no longer
wait for FlowFiles to drain");
+ break;
}
- // Log completion unless the result was completed exceptionally or
cancelled.
- if (!result.isDone()) {
- getLogger().info("All {} FlowFiles have drained from
Connector", initialQueueSize.getObjectCount());
+ // Log the current queue size every 10 seconds (20 iterations of
500ms) so that it's clear
+ // whether or not progress is being made.
+ if (iterations++ % 20 == 0) {
+ final QueueSize queueSize =
flowContext.getRootGroup().getQueueSize();
+ getLogger().info("Waiting for {} FlowFiles ({} bytes) to
drain",
+ queueSize.getObjectCount(),
NumberFormat.getNumberInstance().format(queueSize.getByteCount()));
}
try {
- stop(flowContext);
- } catch (final Exception e) {
- getLogger().warn("Failed to stop source Processors after
draining FlowFiles", e);
- if (failureReason == null) {
- failureReason = e;
- } else {
- failureReason.addSuppressed(e);
- }
+ Thread.sleep(500);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ failureReason = e;
+ break;
}
+ }
+
+ // Log completion unless the result was completed exceptionally or
cancelled.
+ if (!result.isDone()) {
+ getLogger().info("All {} FlowFiles have drained from Connector",
initialQueueSize.getObjectCount());
+ }
- if (failureReason != null && !result.isDone()) {
- result.completeExceptionally(new RuntimeException("Interrupted
while waiting for " + AbstractConnector.this + " to drain", failureReason));
+ try {
+ stop(flowContext);
+ } catch (final Exception e) {
+ getLogger().warn("Failed to stop source Processors after draining
FlowFiles", e);
+ if (failureReason == null) {
+ failureReason = e;
+ } else {
+ failureReason.addSuppressed(e);
}
+ }
+
+ if (failureReason != null && !result.isDone()) {
+ result.completeExceptionally(new RuntimeException("Interrupted
while waiting for " + AbstractConnector.this + " to drain", failureReason));
+ }
+
+ if (!result.isDone()) {
+ result.complete(null);
+ }
+ }
+
+ private void startNonSourceProcessors(final CompletableFuture<Void>
result, final FlowContext flowContext) {
+ if (result.isDone()) {
+ return;
+ }
+
+ final CompletableFuture<Void> enableServices =
flowContext.getRootGroup().getLifecycle().enableControllerServices(
+ ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
+ ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
+
+ try {
+ // Wait for all referenced services to be enabled.
+ enableServices.get();
if (!result.isDone()) {
- result.complete(null);
+ getLogger().info("Starting all non-source processors to
facilitate drainage of FlowFiles");
+ startNonSourceProcessors(flowContext).get();
}
- }).exceptionally(throwable -> {
- if (!result.isDone()) {
- result.completeExceptionally(new RuntimeException("Failed
while draining FlowFiles", throwable));
+ } catch (final Exception e) {
+ try {
+
flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
+ } catch (final Exception e1) {
+ e.addSuppressed(e1);
}
- return null;
- });
- return result;
+ result.completeExceptionally(new RuntimeException("Failed to start
non-source processors while draining FlowFiles", e.getCause()));
+ }
}
/**