This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch NIFI-15258 in repository https://gitbox.apache.org/repos/asf/nifi-api.git
commit 8541f0f8d3b1beda69d7cfeabd6d17c5cc18517b Author: Mark Payne <[email protected]> AuthorDate: Tue Feb 3 10:22:32 2026 -0500 NIFI-15514: Bug fix - ensure that when AbstractConnector is started, it includes all components, not just processors and controller services; code cleanup (#57) --- .../components/connector/AbstractConnector.java | 161 +++++++++++---------- 1 file changed, 81 insertions(+), 80 deletions(-) diff --git a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java index d837f7b..7689858 100644 --- a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java +++ b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java @@ -87,14 +87,7 @@ public abstract class AbstractConnector implements Connector { @Override public void start(final FlowContext context) throws FlowUpdateException { final ProcessGroupLifecycle lifecycle = context.getRootGroup().getLifecycle(); - - try { - lifecycle.enableControllerServices(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY, ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(); - } catch (final Exception e) { - throw new FlowUpdateException("Failed to enable Controller Services", e); - } - - lifecycle.startProcessors(); + lifecycle.start(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY); } @Override @@ -129,7 +122,7 @@ public abstract class AbstractConnector implements Connector { } }); - return stopProcessorsFuture.thenRun(() -> { + return stopProcessorsFuture.thenRunAsync(() -> { try { lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(); } catch (final Exception e) { @@ -171,32 +164,8 @@ public abstract class AbstractConnector implements Connector { final CompletableFuture<Void> stopProcessorsFuture = stopSourceProcessors(flowContext); - final CompletableFuture<Void> startNonSourceFuture = stopProcessorsFuture.thenRun(() -> { - if (result.isDone()) { - return; - } - - final CompletableFuture<Void> enableServices = flowContext.getRootGroup().getLifecycle().enableControllerServices( - ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY, - ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS); - - try { - // Wait for all referenced services to be enabled. - enableServices.get(); - - if (!result.isDone()) { - getLogger().info("Starting all non-source processors to facilitate drainage of FlowFiles"); - startNonSourceProcessors(flowContext).get(); - } - } catch (final Exception e) { - try { - flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(); - } catch (final Exception e1) { - e.addSuppressed(e1); - } - - result.completeExceptionally(new RuntimeException("Failed to start non-source processors while draining FlowFiles", e.getCause())); - } + final CompletableFuture<Void> startNonSourceFuture = stopProcessorsFuture.thenRunAsync(() -> { + startNonSourceProcessors(result, flowContext); }).exceptionally(throwable -> { if (!result.isDone()) { result.completeExceptionally(new RuntimeException("Failed to stop source processors while draining FlowFiles", throwable)); @@ -205,68 +174,100 @@ public abstract class AbstractConnector implements Connector { }); startNonSourceFuture.thenRunAsync(() -> { - try { - ensureDrainageUnblocked(); - } catch (final Exception e) { - getLogger().warn("Failed to ensure drainage is unblocked when draining FlowFiles", e); + completeDrain(result, flowContext, initialQueueSize); + }).exceptionally(throwable -> { + if (!result.isDone()) { + result.completeExceptionally(new RuntimeException("Failed while draining FlowFiles", throwable)); } + return null; + }); - Exception failureReason = null; - int iterations = 0; - while (!isGroupDrained(flowContext.getRootGroup())) { - if (result.isDone()) { - getLogger().info("Drainage has been cancelled; will no longer wait for FlowFiles to drain"); - break; - } + return result; + } - // Log the current queue size every 10 seconds (20 iterations of 500ms) so that it's clear - // whether or not progress is being made. - if (iterations++ % 20 == 0) { - final QueueSize queueSize = flowContext.getRootGroup().getQueueSize(); - getLogger().info("Waiting for {} FlowFiles ({} bytes) to drain", - queueSize.getObjectCount(), NumberFormat.getNumberInstance().format(queueSize.getByteCount())); - } + private void completeDrain(final CompletableFuture<Void> result, final FlowContext flowContext, final QueueSize initialQueueSize) { + try { + ensureDrainageUnblocked(); + } catch (final Exception e) { + getLogger().warn("Failed to ensure drainage is unblocked when draining FlowFiles", e); + } - try { - Thread.sleep(500); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - failureReason = e; - break; - } + Exception failureReason = null; + int iterations = 0; + while (!isGroupDrained(flowContext.getRootGroup())) { + if (result.isDone()) { + getLogger().info("Drainage has been cancelled; will no longer wait for FlowFiles to drain"); + break; } - // Log completion unless the result was completed exceptionally or cancelled. - if (!result.isDone()) { - getLogger().info("All {} FlowFiles have drained from Connector", initialQueueSize.getObjectCount()); + // Log the current queue size every 10 seconds (20 iterations of 500ms) so that it's clear + // whether or not progress is being made. + if (iterations++ % 20 == 0) { + final QueueSize queueSize = flowContext.getRootGroup().getQueueSize(); + getLogger().info("Waiting for {} FlowFiles ({} bytes) to drain", + queueSize.getObjectCount(), NumberFormat.getNumberInstance().format(queueSize.getByteCount())); } try { - stop(flowContext); - } catch (final Exception e) { - getLogger().warn("Failed to stop source Processors after draining FlowFiles", e); - if (failureReason == null) { - failureReason = e; - } else { - failureReason.addSuppressed(e); - } + Thread.sleep(500); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + failureReason = e; + break; } + } + + // Log completion unless the result was completed exceptionally or cancelled. + if (!result.isDone()) { + getLogger().info("All {} FlowFiles have drained from Connector", initialQueueSize.getObjectCount()); + } - if (failureReason != null && !result.isDone()) { - result.completeExceptionally(new RuntimeException("Interrupted while waiting for " + AbstractConnector.this + " to drain", failureReason)); + try { + stop(flowContext); + } catch (final Exception e) { + getLogger().warn("Failed to stop source Processors after draining FlowFiles", e); + if (failureReason == null) { + failureReason = e; + } else { + failureReason.addSuppressed(e); } + } + + if (failureReason != null && !result.isDone()) { + result.completeExceptionally(new RuntimeException("Interrupted while waiting for " + AbstractConnector.this + " to drain", failureReason)); + } + + if (!result.isDone()) { + result.complete(null); + } + } + + private void startNonSourceProcessors(final CompletableFuture<Void> result, final FlowContext flowContext) { + if (result.isDone()) { + return; + } + + final CompletableFuture<Void> enableServices = flowContext.getRootGroup().getLifecycle().enableControllerServices( + ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY, + ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS); + + try { + // Wait for all referenced services to be enabled. + enableServices.get(); if (!result.isDone()) { - result.complete(null); + getLogger().info("Starting all non-source processors to facilitate drainage of FlowFiles"); + startNonSourceProcessors(flowContext).get(); } - }).exceptionally(throwable -> { - if (!result.isDone()) { - result.completeExceptionally(new RuntimeException("Failed while draining FlowFiles", throwable)); + } catch (final Exception e) { + try { + flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(); + } catch (final Exception e1) { + e.addSuppressed(e1); } - return null; - }); - return result; + result.completeExceptionally(new RuntimeException("Failed to start non-source processors while draining FlowFiles", e.getCause())); + } } /**
