This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 8143a12715 NIFI-11392 for CRON scheduled components, improved 
cancellation of … (#7232)
8143a12715 is described below

commit 8143a127151a8b0ae4a12b3f300babe48a36c384
Author: Michael Moser <[email protected]>
AuthorDate: Tue May 30 11:09:06 2023 -0400

    NIFI-11392 for CRON scheduled components, improved cancellation of … (#7232)
    
    NIFI-11392 for CRON scheduled components, improved cancellation of futures 
for thread cleanup
---
 .../scheduling/QuartzSchedulingAgent.java          | 75 ++++++++++------------
 1 file changed, 33 insertions(+), 42 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index 3b15ee45fb..beabd265e0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -25,16 +25,15 @@ import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.quartz.CronExpression;
 
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
-    private final Map<Object, List<AtomicBoolean>> canceledTriggers = new 
HashMap<>();
+    private final Map<Object, Map<Integer, ScheduledFuture<?>>> quartzFutures 
= new HashMap<>();
 
     public QuartzSchedulingAgent(final FlowController flowController, final 
FlowEngine flowEngine, final RepositoryContextFactory contextFactory) {
         super(flowEngine, flowController, contextFactory);
@@ -42,12 +41,19 @@ public class QuartzSchedulingAgent extends 
AbstractTimeBasedSchedulingAgent {
 
     @Override
     public void shutdown() {
+        quartzFutures.values().forEach(map -> map.values().forEach(future -> {
+            if (!future.isCancelled()) {
+                // stop scheduling to run and interrupt currently running 
tasks.
+                future.cancel(true);
+            }
+        }));
+        flowEngine.shutdown();
     }
 
     @Override
     public void doSchedule(final ReportingTaskNode taskNode, final 
LifecycleState scheduleState) {
-        final List<AtomicBoolean> existingTriggers = 
canceledTriggers.get(taskNode);
-        if (existingTriggers != null) {
+        final Map<Integer, ScheduledFuture<?>> componentFuturesMap = 
quartzFutures.computeIfAbsent(taskNode, k -> new HashMap<>());
+        if (!componentFuturesMap.values().isEmpty()) {
             throw new IllegalStateException("Cannot schedule " + 
taskNode.getReportingTask().getIdentifier() + " because it is already scheduled 
to run");
         }
 
@@ -61,7 +67,6 @@ public class QuartzSchedulingAgent extends 
AbstractTimeBasedSchedulingAgent {
 
         final ReportingTaskWrapper taskWrapper = new 
ReportingTaskWrapper(taskNode, scheduleState, 
flowController.getExtensionManager());
 
-        final AtomicBoolean canceled = new AtomicBoolean(false);
         final Date initialDate = cronExpression.getTimeAfter(new Date());
         final long initialDelay = initialDate.getTime() - 
System.currentTimeMillis();
 
@@ -71,37 +76,30 @@ public class QuartzSchedulingAgent extends 
AbstractTimeBasedSchedulingAgent {
 
             @Override
             public void run() {
-                if (canceled.get()) {
-                    return;
-                }
-
                 taskWrapper.run();
 
-                if (canceled.get()) {
-                    return;
-                }
-
                 nextSchedule = getNextSchedule(nextSchedule, cronExpression);
                 final long delay = getDelay(nextSchedule);
 
                 logger.debug("Finished running Reporting Task {}; next 
scheduled time is at {} after a delay of {} milliseconds", taskNode, 
nextSchedule, delay);
-                flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
+                final ScheduledFuture<?> newFuture = flowEngine.schedule(this, 
delay, TimeUnit.MILLISECONDS);
+                final ScheduledFuture<?> oldFuture = 
componentFuturesMap.put(0, newFuture);
+                scheduleState.replaceFuture(oldFuture, newFuture);
             }
         };
 
-        final List<AtomicBoolean> triggers = new ArrayList<>(1);
-        triggers.add(canceled);
-        canceledTriggers.put(taskNode, triggers);
+        final ScheduledFuture<?> future = flowEngine.schedule(command, 
initialDelay, TimeUnit.MILLISECONDS);
+        componentFuturesMap.put(0, future);
 
-        flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
         scheduleState.setScheduled(true);
+        scheduleState.setFutures(componentFuturesMap.values());
         logger.info("Scheduled Reporting Task {} to run threads on schedule 
{}", taskNode, cronSchedule);
     }
 
     @Override
     public synchronized void doSchedule(final Connectable connectable, final 
LifecycleState scheduleState) {
-        final List<AtomicBoolean> existingTriggers = 
canceledTriggers.get(connectable);
-        if (existingTriggers != null) {
+        final Map<Integer, ScheduledFuture<?>> componentFuturesMap = 
quartzFutures.computeIfAbsent(connectable, k -> new HashMap<>());
+        if (!componentFuturesMap.values().isEmpty()) {
             throw new IllegalStateException("Cannot schedule " + connectable + 
" because it is already scheduled to run");
         }
 
@@ -114,11 +112,10 @@ public class QuartzSchedulingAgent extends 
AbstractTimeBasedSchedulingAgent {
             throw new IllegalStateException("Cannot schedule " + connectable + 
" to run because its scheduling period is not valid");
         }
 
-        final List<AtomicBoolean> triggers = new ArrayList<>();
         for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
             final ConnectableTask continuallyRunTask = new 
ConnectableTask(this, connectable, flowController, contextFactory, 
scheduleState);
 
-            final AtomicBoolean canceled = new AtomicBoolean(false);
+            final AtomicInteger taskNumber = new AtomicInteger(i);
 
             final Date initialDate = cronExpression.getTimeAfter(new Date());
             final long initialDelay = initialDate.getTime() - 
System.currentTimeMillis();
@@ -129,10 +126,6 @@ public class QuartzSchedulingAgent extends 
AbstractTimeBasedSchedulingAgent {
 
                 @Override
                 public void run() {
-                    if (canceled.get()) {
-                        return;
-                    }
-
                     try {
                         continuallyRunTask.invoke();
                     } catch (final RuntimeException re) {
@@ -141,24 +134,21 @@ public class QuartzSchedulingAgent extends 
AbstractTimeBasedSchedulingAgent {
                         throw new ProcessException(e);
                     }
 
-                    if (canceled.get()) {
-                        return;
-                    }
-
                     nextSchedule = getNextSchedule(nextSchedule, 
cronExpression);
                     final long delay = getDelay(nextSchedule);
 
                     logger.debug("Finished task for {}; next scheduled time is 
at {} after a delay of {} milliseconds", connectable, nextSchedule, delay);
-                    flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
+                    final ScheduledFuture<?> newFuture = 
flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
+                    final ScheduledFuture<?> oldFuture = 
componentFuturesMap.put(taskNumber.get(), newFuture);
+                    scheduleState.replaceFuture(oldFuture, newFuture);
                 }
             };
 
-
-            flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
-            triggers.add(canceled);
+            final ScheduledFuture<?> future = flowEngine.schedule(command, 
initialDelay, TimeUnit.MILLISECONDS);
+            componentFuturesMap.put(taskNumber.get(), future);
         }
 
-        canceledTriggers.put(connectable, triggers);
+        scheduleState.setFutures(componentFuturesMap.values());
         logger.info("Scheduled {} to run with {} threads on schedule {}", 
connectable, connectable.getMaxConcurrentTasks(), cronSchedule);
     }
 
@@ -173,12 +163,13 @@ public class QuartzSchedulingAgent extends 
AbstractTimeBasedSchedulingAgent {
     }
 
     private void unschedule(final Object scheduled, final LifecycleState 
scheduleState) {
-        final List<AtomicBoolean> triggers = 
canceledTriggers.remove(scheduled);
-        if (triggers != null) {
-            for (final AtomicBoolean trigger : triggers) {
-                trigger.set(true);
+        quartzFutures.remove(scheduled);
+        scheduleState.getFutures().forEach(future -> {
+            if (!future.isCancelled()) {
+                // stop scheduling to run but do not interrupt currently 
running tasks.
+                future.cancel(false);
             }
-        }
+        });
 
         scheduleState.setScheduled(false);
         logger.info("Stopped scheduling {} to run", scheduled);

Reply via email to