This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 43931b1504 NIFI-10518: Adding intended state to
ScheduledStateChangeListener (#6428)
43931b1504 is described below
commit 43931b15043e4cb712c6e8dca19ebaa2a55796e8
Author: Joe Gresock <[email protected]>
AuthorDate: Mon Sep 19 10:38:00 2022 -0400
NIFI-10518: Adding intended state to ScheduledStateChangeListener (#6428)
- NIFI-10518: Adding intended state to ScheduledStateChangeListener
- Notifying of scheduled state change when transitionComponentState is
called
- Notifying scheduled state change when reporting task state is changed
- Notifying scheduledState changes for remote group port start/stop
components calls
---
.../StandardVersionedComponentSynchronizer.java | 78 ++++++++++++++--------
...StandardVersionedComponentSynchronizerTest.java | 8 +--
.../nifi/groups/ScheduledStateChangeListener.java | 17 ++---
3 files changed, 64 insertions(+), 39 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 47d4077bcb..09abfafd36 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -35,6 +35,7 @@ import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -1164,12 +1165,12 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
if (proposed != null && proposed.getScheduledState() !=
org.apache.nifi.flow.ScheduledState.DISABLED) {
// Re-enable the controller service if necessary
serviceProvider.enableControllerServicesAsync(servicesToRestart);
- notifyScheduledStateChange(servicesToRestart,
synchronizationOptions);
+ notifyScheduledStateChange(servicesToRestart,
synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
// Restart any components that need to be restarted.
if (controllerService != null) {
serviceProvider.scheduleReferencingComponents(controllerService,
referencesToRestart, context.getComponentScheduler());
- referencesToRestart.forEach(componentNode ->
notifyScheduledStateChange(componentNode, synchronizationOptions));
+ referencesToRestart.forEach(componentNode ->
notifyScheduledStateChange(componentNode, synchronizationOptions,
org.apache.nifi.flow.ScheduledState.RUNNING));
}
}
}
@@ -1499,7 +1500,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
for (final ComponentNode stoppedComponent :
componentsToRestart) {
if (stoppedComponent instanceof Connectable) {
context.getComponentScheduler().startComponent((Connectable) stoppedComponent);
- notifyScheduledStateChange(stoppedComponent,
synchronizationOptions);
+ notifyScheduledStateChange(stoppedComponent,
synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
}
}
}
@@ -1555,7 +1556,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
// Disable all Controller Services
final Collection<ControllerServiceNode> controllerServices =
processGroup.findAllControllerServices();
final Future<Void> disableServicesFuture =
context.getControllerServiceProvider().disableControllerServicesAsync(controllerServices);
- notifyScheduledStateChange(controllerServices,
synchronizationOptions);
+ notifyScheduledStateChange(controllerServices,
synchronizationOptions, org.apache.nifi.flow.ScheduledState.DISABLED);
try {
disableServicesFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (final ExecutionException ee) {
@@ -1658,7 +1659,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
// Stop all necessary enabled/active Controller Services
final Future<Void> serviceDisableFuture =
context.getControllerServiceProvider().disableControllerServicesAsync(controllerServicesToStop);
- notifyScheduledStateChange(controllerServicesToStop,
synchronizationOptions);
+ notifyScheduledStateChange(controllerServicesToStop,
synchronizationOptions, org.apache.nifi.flow.ScheduledState.DISABLED);
try {
serviceDisableFuture.get(timeout -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
@@ -1686,11 +1687,11 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
} finally {
// Re-enable all Controller Services that we disabled and
restart all processors
context.getControllerServiceProvider().enableControllerServicesAsync(controllerServicesToStop);
- notifyScheduledStateChange(controllerServicesToStop,
synchronizationOptions);
+ notifyScheduledStateChange(controllerServicesToStop,
synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
for (final ProcessorNode processor : processorsToStop) {
processor.getProcessGroup().startProcessor(processor,
false);
- notifyScheduledStateChange((ComponentNode)
processor,synchronizationOptions);
+ notifyScheduledStateChange((ComponentNode)
processor,synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
}
}
} finally {
@@ -2257,7 +2258,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
private void startComponents(final Collection<Connectable>
stoppedComponents, final FlowSynchronizationOptions synchronizationOptions) {
for (final Connectable stoppedComponent : stoppedComponents) {
context.getComponentScheduler().startComponent(stoppedComponent);
- notifyScheduledStateChange(stoppedComponent,
synchronizationOptions);
+ notifyScheduledStateChange(stoppedComponent,
synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
}
}
@@ -2269,6 +2270,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
port.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
context.getComponentScheduler().transitionComponentState(port,
proposed.getScheduledState());
+ notifyScheduledStateChange(port, syncOptions,
proposed.getScheduledState());
}
private Port addInputPort(final ProcessGroup destination, final
VersionedPort proposed, final ComponentIdGenerator componentIdGenerator, final
String temporaryName) {
@@ -2490,45 +2492,62 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
return stoppedComponents;
}
- private void notifyScheduledStateChange(final Connectable component, final
FlowSynchronizationOptions synchronizationOptions) {
+ private void notifyScheduledStateChange(final Connectable component, final
FlowSynchronizationOptions synchronizationOptions, final
org.apache.nifi.flow.ScheduledState intendedState) {
try {
if (component instanceof ProcessorNode) {
-
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode)
component);
+
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode)
component, intendedState);
} else if (component instanceof Port) {
-
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port)
component);
+
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port)
component, intendedState);
}
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes",
e);
}
}
- private void notifyScheduledStateChange(final ComponentNode component,
final FlowSynchronizationOptions synchronizationOptions) {
+ private void notifyScheduledStateChange(final ComponentNode component,
final FlowSynchronizationOptions synchronizationOptions, final
org.apache.nifi.flow.ScheduledState intendedState) {
+ if (component instanceof Triggerable && intendedState ==
org.apache.nifi.flow.ScheduledState.RUNNING && ((Triggerable)
component).getScheduledState() == ScheduledState.DISABLED) {
+ return;
+ }
try {
if (component instanceof ProcessorNode) {
-
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode)
component);
+
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode)
component, intendedState);
} else if (component instanceof Port) {
-
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port)
component);
+
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port)
component, intendedState);
} else if (component instanceof ControllerServiceNode) {
-
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ControllerServiceNode)
component);
+
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ControllerServiceNode)
component, intendedState);
} else if (component instanceof ReportingTaskNode) {
-
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ReportingTaskNode)
component);
+ final ReportingTaskNode reportingTaskNode =
(ReportingTaskNode) component;
+ if (intendedState ==
org.apache.nifi.flow.ScheduledState.RUNNING &&
reportingTaskNode.getScheduledState() == ScheduledState.DISABLED) {
+ return;
+ }
+
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(reportingTaskNode,
intendedState);
}
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes",
e);
}
}
- private void notifyScheduledStateChange(final
Collection<ControllerServiceNode> servicesToRestart, final
FlowSynchronizationOptions synchronizationOptions) {
+ private void notifyScheduledStateChange(final
Collection<ControllerServiceNode> servicesToRestart, final
FlowSynchronizationOptions synchronizationOptions,
+ final
org.apache.nifi.flow.ScheduledState intendedState) {
try {
-
servicesToRestart.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange);
+ servicesToRestart.forEach(service -> {
+
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(service,
intendedState);
+ if (intendedState ==
org.apache.nifi.flow.ScheduledState.DISABLED) {
+
service.getReferences().findRecursiveReferences(ControllerServiceNode.class)
+ .forEach(reference ->
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(reference,
org.apache.nifi.flow.ScheduledState.DISABLED));
+ } else if (intendedState ==
org.apache.nifi.flow.ScheduledState.ENABLED) {
+
service.getRequiredControllerServices().forEach(requiredService ->
synchronizationOptions.getScheduledStateChangeListener()
+ .onScheduledStateChange(requiredService,
org.apache.nifi.flow.ScheduledState.ENABLED));
+ }
+ });
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes",
e);
}
}
- private void notifyScheduledStateChange(final Port inputPort, final
FlowSynchronizationOptions synchronizationOptions) {
+ private void notifyScheduledStateChange(final Port inputPort, final
FlowSynchronizationOptions synchronizationOptions, final
org.apache.nifi.flow.ScheduledState intendedState) {
try {
-
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(inputPort);
+
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(inputPort,
intendedState);
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes",
e);
}
@@ -2544,12 +2563,12 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
case INPUT_PORT:
final Port inputPort = (Port) component;
component.getProcessGroup().stopInputPort(inputPort);
- notifyScheduledStateChange(inputPort, synchronizationOptions);
+ notifyScheduledStateChange(inputPort, synchronizationOptions,
org.apache.nifi.flow.ScheduledState.ENABLED);
return true;
case OUTPUT_PORT:
final Port outputPort = (Port) component;
component.getProcessGroup().stopOutputPort(outputPort);
- notifyScheduledStateChange(outputPort, synchronizationOptions);
+ notifyScheduledStateChange(outputPort, synchronizationOptions,
org.apache.nifi.flow.ScheduledState.ENABLED);
return true;
case PROCESSOR:
final ProcessorNode processorNode = (ProcessorNode) component;
@@ -2574,7 +2593,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
return true;
}
} finally {
- notifyScheduledStateChange((ComponentNode) processor,
synchronizationOptions);
+ notifyScheduledStateChange((ComponentNode) processor,
synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
}
}
@@ -2612,7 +2631,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
final Future<Void> future = entry.getValue();
waitForStopCompletion(future, component, timeout, timeoutAction);
- notifyScheduledStateChange(component, synchronizationOptions);
+ notifyScheduledStateChange(component, synchronizationOptions,
org.apache.nifi.flow.ScheduledState.ENABLED);
}
if (controllerService.isActive()) {
@@ -2636,7 +2655,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
// Disable the service and wait for completion, up to the timeout
allowed
final Future<Void> future =
serviceProvider.disableControllerServicesAsync(servicesToStop);
waitForStopCompletion(future, controllerService, timeout,
timeoutAction);
- notifyScheduledStateChange(servicesToStop, synchronizationOptions);
+ notifyScheduledStateChange(servicesToStop, synchronizationOptions,
org.apache.nifi.flow.ScheduledState.DISABLED);
}
}
@@ -2689,6 +2708,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
// Transition state to disabled/enabled/running
context.getComponentScheduler().transitionComponentState(processor,
proposed.getScheduledState());
+ notifyScheduledStateChange((ComponentNode) processor, syncOptions,
proposed.getScheduledState());
if (!isEqual(processor.getBundleCoordinate(),
proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate =
toCoordinate(proposed.getBundle());
@@ -2739,7 +2759,8 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
final Future<?> future = rpg.stopTransmitting();
try {
-
transmitting.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange);
+ transmitting.forEach(remoteGroupPort ->
synchronizationOptions.getScheduledStateChangeListener()
+ .onScheduledStateChange(remoteGroupPort,
org.apache.nifi.flow.ScheduledState.ENABLED));
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of
ScheduledState changes", e);
}
@@ -2947,10 +2968,12 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
if (versionedPort.getScheduledState() ==
org.apache.nifi.flow.ScheduledState.RUNNING) {
if (portState != ScheduledState.RUNNING) {
context.getComponentScheduler().startComponent(remoteGroupPort);
+ notifyScheduledStateChange(remoteGroupPort, syncOptions,
org.apache.nifi.flow.ScheduledState.RUNNING);
}
} else {
if (portState == ScheduledState.RUNNING) {
context.getComponentScheduler().stopComponent(remoteGroupPort);
+ notifyScheduledStateChange(remoteGroupPort, syncOptions,
org.apache.nifi.flow.ScheduledState.ENABLED);
}
}
}
@@ -3112,7 +3135,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
processor.getProcessGroup().stopProcessor(processor);
processor.terminate();
- notifyScheduledStateChange((ComponentNode) processor,
synchronizationOptions);
+ notifyScheduledStateChange((ComponentNode) processor,
synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
}
}
@@ -3419,6 +3442,7 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
}
break;
}
+ notifyScheduledStateChange(reportingTask, syncOptions,
proposed.getScheduledState());
} finally {
reportingTask.resumeValidationTrigger();
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
index 3ed688c6ab..8360d4ff74 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
@@ -1137,22 +1137,22 @@ public class StandardVersionedComponentSynchronizerTest
{
private List<ScheduledStateUpdate<ReportingTaskNode>>
reportingTaskUpdates = new ArrayList<>();
@Override
- public void onScheduledStateChange(final ProcessorNode processor) {
+ public void onScheduledStateChange(final ProcessorNode processor,
final ScheduledState intendedState) {
processorUpdates.add(new ScheduledStateUpdate<>(processor,
processor.getScheduledState()));
}
@Override
- public void onScheduledStateChange(ControllerServiceNode
controllerService) {
+ public void onScheduledStateChange(ControllerServiceNode
controllerService, final ScheduledState intendedState) {
serviceUpdates.add(new
ControllerServiceStateUpdate(controllerService, controllerService.getState()));
}
@Override
- public void onScheduledStateChange(ReportingTaskNode reportingTask) {
+ public void onScheduledStateChange(ReportingTaskNode reportingTask,
final ScheduledState intendedState) {
reportingTaskUpdates.add(new ScheduledStateUpdate<>(reportingTask,
reportingTask.getScheduledState()));
}
@Override
- public void onScheduledStateChange(final Port port) {
+ public void onScheduledStateChange(final Port port, final
ScheduledState intendedState) {
portUpdates.add(new ScheduledStateUpdate<>(port,
port.getScheduledState()));
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java
index d2c45e3165..64b17e4213 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java
@@ -21,34 +21,35 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.ScheduledState;
public interface ScheduledStateChangeListener {
- void onScheduledStateChange(ProcessorNode processor);
+ void onScheduledStateChange(ProcessorNode processor, ScheduledState
intendedState);
- void onScheduledStateChange(Port port);
+ void onScheduledStateChange(Port port, ScheduledState intendedState);
- void onScheduledStateChange(ControllerServiceNode controllerService);
+ void onScheduledStateChange(ControllerServiceNode controllerService,
ScheduledState intendedState);
- void onScheduledStateChange(ReportingTaskNode reportingTask);
+ void onScheduledStateChange(ReportingTaskNode reportingTask,
ScheduledState intendedState);
ScheduledStateChangeListener EMPTY = new ScheduledStateChangeListener() {
@Override
- public void onScheduledStateChange(ProcessorNode processor) {
+ public void onScheduledStateChange(ProcessorNode processor,
ScheduledState intendedState) {
}
@Override
- public void onScheduledStateChange(Port port) {
+ public void onScheduledStateChange(Port port, ScheduledState
intendedState) {
}
@Override
- public void onScheduledStateChange(ControllerServiceNode
controllerService) {
+ public void onScheduledStateChange(ControllerServiceNode
controllerService, ScheduledState intendedState) {
}
@Override
- public void onScheduledStateChange(ReportingTaskNode reportingTask) {
+ public void onScheduledStateChange(ReportingTaskNode reportingTask,
ScheduledState intendedState) {
}
};