This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 43931b1504 NIFI-10518: Adding intended state to 
ScheduledStateChangeListener (#6428)
43931b1504 is described below

commit 43931b15043e4cb712c6e8dca19ebaa2a55796e8
Author: Joe Gresock <[email protected]>
AuthorDate: Mon Sep 19 10:38:00 2022 -0400

    NIFI-10518: Adding intended state to ScheduledStateChangeListener (#6428)
    
    - NIFI-10518: Adding intended state to ScheduledStateChangeListener
    - Notifying of scheduled state change when transitionComponentState is 
called
    - Notifying scheduled state change when reporting task state is changed
    - Notifying scheduledState changes for remote group port start/stop 
components calls
---
 .../StandardVersionedComponentSynchronizer.java    | 78 ++++++++++++++--------
 ...StandardVersionedComponentSynchronizerTest.java |  8 +--
 .../nifi/groups/ScheduledStateChangeListener.java  | 17 ++---
 3 files changed, 64 insertions(+), 39 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 47d4077bcb..09abfafd36 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -35,6 +35,7 @@ import org.apache.nifi.controller.PropertyConfiguration;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.Triggerable;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -1164,12 +1165,12 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
                 if (proposed != null && proposed.getScheduledState() != 
org.apache.nifi.flow.ScheduledState.DISABLED) {
                     // Re-enable the controller service if necessary
                     
serviceProvider.enableControllerServicesAsync(servicesToRestart);
-                    notifyScheduledStateChange(servicesToRestart, 
synchronizationOptions);
+                    notifyScheduledStateChange(servicesToRestart, 
synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
 
                     // Restart any components that need to be restarted.
                     if (controllerService != null) {
                         
serviceProvider.scheduleReferencingComponents(controllerService, 
referencesToRestart, context.getComponentScheduler());
-                        referencesToRestart.forEach(componentNode -> 
notifyScheduledStateChange(componentNode, synchronizationOptions));
+                        referencesToRestart.forEach(componentNode -> 
notifyScheduledStateChange(componentNode, synchronizationOptions, 
org.apache.nifi.flow.ScheduledState.RUNNING));
                     }
                 }
             }
@@ -1499,7 +1500,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
                 for (final ComponentNode stoppedComponent : 
componentsToRestart) {
                     if (stoppedComponent instanceof Connectable) {
                         
context.getComponentScheduler().startComponent((Connectable) stoppedComponent);
-                        notifyScheduledStateChange(stoppedComponent, 
synchronizationOptions);
+                        notifyScheduledStateChange(stoppedComponent, 
synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
                     }
                 }
             }
@@ -1555,7 +1556,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
                 // Disable all Controller Services
                 final Collection<ControllerServiceNode> controllerServices = 
processGroup.findAllControllerServices();
                 final Future<Void> disableServicesFuture = 
context.getControllerServiceProvider().disableControllerServicesAsync(controllerServices);
-                notifyScheduledStateChange(controllerServices, 
synchronizationOptions);
+                notifyScheduledStateChange(controllerServices, 
synchronizationOptions, org.apache.nifi.flow.ScheduledState.DISABLED);
                 try {
                     disableServicesFuture.get(timeout, TimeUnit.MILLISECONDS);
                 } catch (final ExecutionException ee) {
@@ -1658,7 +1659,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
 
                 // Stop all necessary enabled/active Controller Services
                 final Future<Void> serviceDisableFuture = 
context.getControllerServiceProvider().disableControllerServicesAsync(controllerServicesToStop);
-                notifyScheduledStateChange(controllerServicesToStop, 
synchronizationOptions);
+                notifyScheduledStateChange(controllerServicesToStop, 
synchronizationOptions, org.apache.nifi.flow.ScheduledState.DISABLED);
                 try {
                     serviceDisableFuture.get(timeout - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                 } catch (ExecutionException e) {
@@ -1686,11 +1687,11 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
             } finally {
                 // Re-enable all Controller Services that we disabled and 
restart all processors
                 
context.getControllerServiceProvider().enableControllerServicesAsync(controllerServicesToStop);
-                notifyScheduledStateChange(controllerServicesToStop, 
synchronizationOptions);
+                notifyScheduledStateChange(controllerServicesToStop, 
synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
 
                 for (final ProcessorNode processor : processorsToStop) {
                     processor.getProcessGroup().startProcessor(processor, 
false);
-                    notifyScheduledStateChange((ComponentNode) 
processor,synchronizationOptions);
+                    notifyScheduledStateChange((ComponentNode) 
processor,synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
                 }
             }
         } finally {
@@ -2257,7 +2258,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
     private void startComponents(final Collection<Connectable> 
stoppedComponents, final FlowSynchronizationOptions synchronizationOptions) {
         for (final Connectable stoppedComponent : stoppedComponents) {
             context.getComponentScheduler().startComponent(stoppedComponent);
-            notifyScheduledStateChange(stoppedComponent, 
synchronizationOptions);
+            notifyScheduledStateChange(stoppedComponent, 
synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
         }
     }
 
@@ -2269,6 +2270,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
         
port.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
 
         context.getComponentScheduler().transitionComponentState(port, 
proposed.getScheduledState());
+        notifyScheduledStateChange(port, syncOptions, 
proposed.getScheduledState());
     }
 
     private Port addInputPort(final ProcessGroup destination, final 
VersionedPort proposed, final ComponentIdGenerator componentIdGenerator, final 
String temporaryName) {
@@ -2490,45 +2492,62 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
         return stoppedComponents;
     }
 
-    private void notifyScheduledStateChange(final Connectable component, final 
FlowSynchronizationOptions synchronizationOptions) {
+    private void notifyScheduledStateChange(final Connectable component, final 
FlowSynchronizationOptions synchronizationOptions, final 
org.apache.nifi.flow.ScheduledState intendedState) {
         try {
             if (component instanceof ProcessorNode) {
-                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode)
 component);
+                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode)
 component, intendedState);
             } else if (component instanceof Port) {
-                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port)
 component);
+                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port)
 component, intendedState);
             }
         } catch (final Exception e) {
             LOG.debug("Failed to notify listeners of ScheduledState changes", 
e);
         }
     }
 
-    private void notifyScheduledStateChange(final ComponentNode component, 
final FlowSynchronizationOptions synchronizationOptions) {
+    private void notifyScheduledStateChange(final ComponentNode component, 
final FlowSynchronizationOptions synchronizationOptions, final 
org.apache.nifi.flow.ScheduledState intendedState) {
+        if (component instanceof Triggerable && intendedState == 
org.apache.nifi.flow.ScheduledState.RUNNING && ((Triggerable) 
component).getScheduledState() == ScheduledState.DISABLED) {
+            return;
+        }
         try {
             if (component instanceof ProcessorNode) {
-                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode)
 component);
+                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode)
 component, intendedState);
             } else if (component instanceof Port) {
-                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port)
 component);
+                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port)
 component, intendedState);
             } else if (component instanceof ControllerServiceNode) {
-                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ControllerServiceNode)
 component);
+                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ControllerServiceNode)
 component, intendedState);
             } else if (component instanceof ReportingTaskNode) {
-                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ReportingTaskNode)
 component);
+                final ReportingTaskNode reportingTaskNode = 
(ReportingTaskNode) component;
+                if (intendedState == 
org.apache.nifi.flow.ScheduledState.RUNNING && 
reportingTaskNode.getScheduledState() == ScheduledState.DISABLED) {
+                    return;
+                }
+                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(reportingTaskNode,
 intendedState);
             }
         } catch (final Exception e) {
             LOG.debug("Failed to notify listeners of ScheduledState changes", 
e);
         }
     }
 
-    private void notifyScheduledStateChange(final 
Collection<ControllerServiceNode> servicesToRestart, final 
FlowSynchronizationOptions synchronizationOptions) {
+    private void notifyScheduledStateChange(final 
Collection<ControllerServiceNode> servicesToRestart, final 
FlowSynchronizationOptions synchronizationOptions,
+                                            final 
org.apache.nifi.flow.ScheduledState intendedState) {
         try {
-            
servicesToRestart.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange);
+            servicesToRestart.forEach(service -> {
+                
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(service,
 intendedState);
+                if (intendedState == 
org.apache.nifi.flow.ScheduledState.DISABLED) {
+                    
service.getReferences().findRecursiveReferences(ControllerServiceNode.class)
+                            .forEach(reference -> 
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(reference,
 org.apache.nifi.flow.ScheduledState.DISABLED));
+                } else if (intendedState == 
org.apache.nifi.flow.ScheduledState.ENABLED) {
+                    
service.getRequiredControllerServices().forEach(requiredService -> 
synchronizationOptions.getScheduledStateChangeListener()
+                            .onScheduledStateChange(requiredService, 
org.apache.nifi.flow.ScheduledState.ENABLED));
+                }
+            });
         } catch (final Exception e) {
             LOG.debug("Failed to notify listeners of ScheduledState changes", 
e);
         }
     }
 
-    private void notifyScheduledStateChange(final Port inputPort, final 
FlowSynchronizationOptions synchronizationOptions) {
+    private void notifyScheduledStateChange(final Port inputPort, final 
FlowSynchronizationOptions synchronizationOptions, final 
org.apache.nifi.flow.ScheduledState intendedState) {
         try {
-            
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(inputPort);
+            
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(inputPort,
 intendedState);
         } catch (final Exception e) {
             LOG.debug("Failed to notify listeners of ScheduledState changes", 
e);
         }
@@ -2544,12 +2563,12 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
             case INPUT_PORT:
                 final Port inputPort = (Port) component;
                 component.getProcessGroup().stopInputPort(inputPort);
-                notifyScheduledStateChange(inputPort, synchronizationOptions);
+                notifyScheduledStateChange(inputPort, synchronizationOptions, 
org.apache.nifi.flow.ScheduledState.ENABLED);
                 return true;
             case OUTPUT_PORT:
                 final Port outputPort = (Port) component;
                 component.getProcessGroup().stopOutputPort(outputPort);
-                notifyScheduledStateChange(outputPort, synchronizationOptions);
+                notifyScheduledStateChange(outputPort, synchronizationOptions, 
org.apache.nifi.flow.ScheduledState.ENABLED);
                 return true;
             case PROCESSOR:
                 final ProcessorNode processorNode = (ProcessorNode) component;
@@ -2574,7 +2593,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
                     return true;
             }
         } finally {
-            notifyScheduledStateChange((ComponentNode) processor, 
synchronizationOptions);
+            notifyScheduledStateChange((ComponentNode) processor, 
synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
         }
     }
 
@@ -2612,7 +2631,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
             final Future<Void> future = entry.getValue();
 
             waitForStopCompletion(future, component, timeout, timeoutAction);
-            notifyScheduledStateChange(component, synchronizationOptions);
+            notifyScheduledStateChange(component, synchronizationOptions, 
org.apache.nifi.flow.ScheduledState.ENABLED);
         }
 
         if (controllerService.isActive()) {
@@ -2636,7 +2655,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
             // Disable the service and wait for completion, up to the timeout 
allowed
             final Future<Void> future = 
serviceProvider.disableControllerServicesAsync(servicesToStop);
             waitForStopCompletion(future, controllerService, timeout, 
timeoutAction);
-            notifyScheduledStateChange(servicesToStop, synchronizationOptions);
+            notifyScheduledStateChange(servicesToStop, synchronizationOptions, 
org.apache.nifi.flow.ScheduledState.DISABLED);
         }
     }
 
@@ -2689,6 +2708,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
 
             // Transition state to disabled/enabled/running
             
context.getComponentScheduler().transitionComponentState(processor, 
proposed.getScheduledState());
+            notifyScheduledStateChange((ComponentNode) processor, syncOptions, 
proposed.getScheduledState());
 
             if (!isEqual(processor.getBundleCoordinate(), 
proposed.getBundle())) {
                 final BundleCoordinate newBundleCoordinate = 
toCoordinate(proposed.getBundle());
@@ -2739,7 +2759,8 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
 
                     final Future<?> future = rpg.stopTransmitting();
                     try {
-                        
transmitting.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange);
+                        transmitting.forEach(remoteGroupPort -> 
synchronizationOptions.getScheduledStateChangeListener()
+                                .onScheduledStateChange(remoteGroupPort, 
org.apache.nifi.flow.ScheduledState.ENABLED));
                     } catch (final Exception e) {
                         LOG.debug("Failed to notify listeners of 
ScheduledState changes", e);
                     }
@@ -2947,10 +2968,12 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
         if (versionedPort.getScheduledState() == 
org.apache.nifi.flow.ScheduledState.RUNNING) {
             if (portState != ScheduledState.RUNNING) {
                 
context.getComponentScheduler().startComponent(remoteGroupPort);
+                notifyScheduledStateChange(remoteGroupPort, syncOptions, 
org.apache.nifi.flow.ScheduledState.RUNNING);
             }
         } else {
             if (portState == ScheduledState.RUNNING) {
                 context.getComponentScheduler().stopComponent(remoteGroupPort);
+                notifyScheduledStateChange(remoteGroupPort, syncOptions, 
org.apache.nifi.flow.ScheduledState.ENABLED);
             }
         }
     }
@@ -3112,7 +3135,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
 
             processor.getProcessGroup().stopProcessor(processor);
             processor.terminate();
-            notifyScheduledStateChange((ComponentNode) processor, 
synchronizationOptions);
+            notifyScheduledStateChange((ComponentNode) processor, 
synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
         }
     }
 
@@ -3419,6 +3442,7 @@ public class StandardVersionedComponentSynchronizer 
implements VersionedComponen
                     }
                     break;
             }
+            notifyScheduledStateChange(reportingTask, syncOptions, 
proposed.getScheduledState());
         } finally {
             reportingTask.resumeValidationTrigger();
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
index 3ed688c6ab..8360d4ff74 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
@@ -1137,22 +1137,22 @@ public class StandardVersionedComponentSynchronizerTest 
{
         private List<ScheduledStateUpdate<ReportingTaskNode>> 
reportingTaskUpdates = new ArrayList<>();
 
         @Override
-        public void onScheduledStateChange(final ProcessorNode processor) {
+        public void onScheduledStateChange(final ProcessorNode processor, 
final ScheduledState intendedState) {
             processorUpdates.add(new ScheduledStateUpdate<>(processor, 
processor.getScheduledState()));
         }
 
         @Override
-        public void onScheduledStateChange(ControllerServiceNode 
controllerService) {
+        public void onScheduledStateChange(ControllerServiceNode 
controllerService, final ScheduledState intendedState) {
             serviceUpdates.add(new 
ControllerServiceStateUpdate(controllerService, controllerService.getState()));
         }
 
         @Override
-        public void onScheduledStateChange(ReportingTaskNode reportingTask) {
+        public void onScheduledStateChange(ReportingTaskNode reportingTask, 
final ScheduledState intendedState) {
             reportingTaskUpdates.add(new ScheduledStateUpdate<>(reportingTask, 
reportingTask.getScheduledState()));
         }
 
         @Override
-        public void onScheduledStateChange(final Port port) {
+        public void onScheduledStateChange(final Port port, final 
ScheduledState intendedState) {
             portUpdates.add(new ScheduledStateUpdate<>(port, 
port.getScheduledState()));
         }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java
index d2c45e3165..64b17e4213 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java
@@ -21,34 +21,35 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.ScheduledState;
 
 public interface ScheduledStateChangeListener {
-    void onScheduledStateChange(ProcessorNode processor);
+    void onScheduledStateChange(ProcessorNode processor, ScheduledState 
intendedState);
 
-    void onScheduledStateChange(Port port);
+    void onScheduledStateChange(Port port, ScheduledState intendedState);
 
-    void onScheduledStateChange(ControllerServiceNode controllerService);
+    void onScheduledStateChange(ControllerServiceNode controllerService, 
ScheduledState intendedState);
 
-    void onScheduledStateChange(ReportingTaskNode reportingTask);
+    void onScheduledStateChange(ReportingTaskNode reportingTask, 
ScheduledState intendedState);
 
     ScheduledStateChangeListener EMPTY = new ScheduledStateChangeListener() {
         @Override
-        public void onScheduledStateChange(ProcessorNode processor) {
+        public void onScheduledStateChange(ProcessorNode processor, 
ScheduledState intendedState) {
 
         }
 
         @Override
-        public void onScheduledStateChange(Port port) {
+        public void onScheduledStateChange(Port port, ScheduledState 
intendedState) {
 
         }
 
         @Override
-        public void onScheduledStateChange(ControllerServiceNode 
controllerService) {
+        public void onScheduledStateChange(ControllerServiceNode 
controllerService, ScheduledState intendedState) {
 
         }
 
         @Override
-        public void onScheduledStateChange(ReportingTaskNode reportingTask) {
+        public void onScheduledStateChange(ReportingTaskNode reportingTask, 
ScheduledState intendedState) {
 
         }
     };

Reply via email to