This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 8143a12715 NIFI-11392 for CRON scheduled components, improved
cancellation of … (#7232)
8143a12715 is described below
commit 8143a127151a8b0ae4a12b3f300babe48a36c384
Author: Michael Moser <[email protected]>
AuthorDate: Tue May 30 11:09:06 2023 -0400
NIFI-11392 for CRON scheduled components, improved cancellation of … (#7232)
NIFI-11392 for CRON scheduled components, improved cancellation of futures
for thread cleanup
---
.../scheduling/QuartzSchedulingAgent.java | 75 ++++++++++------------
1 file changed, 33 insertions(+), 42 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index 3b15ee45fb..beabd265e0 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -25,16 +25,15 @@ import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.exception.ProcessException;
import org.quartz.CronExpression;
-import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
- private final Map<Object, List<AtomicBoolean>> canceledTriggers = new
HashMap<>();
+ private final Map<Object, Map<Integer, ScheduledFuture<?>>> quartzFutures
= new HashMap<>();
public QuartzSchedulingAgent(final FlowController flowController, final
FlowEngine flowEngine, final RepositoryContextFactory contextFactory) {
super(flowEngine, flowController, contextFactory);
@@ -42,12 +41,19 @@ public class QuartzSchedulingAgent extends
AbstractTimeBasedSchedulingAgent {
@Override
public void shutdown() {
+ quartzFutures.values().forEach(map -> map.values().forEach(future -> {
+ if (!future.isCancelled()) {
+ // stop scheduling to run and interrupt currently running
tasks.
+ future.cancel(true);
+ }
+ }));
+ flowEngine.shutdown();
}
@Override
public void doSchedule(final ReportingTaskNode taskNode, final
LifecycleState scheduleState) {
- final List<AtomicBoolean> existingTriggers =
canceledTriggers.get(taskNode);
- if (existingTriggers != null) {
+ final Map<Integer, ScheduledFuture<?>> componentFuturesMap =
quartzFutures.computeIfAbsent(taskNode, k -> new HashMap<>());
+ if (!componentFuturesMap.values().isEmpty()) {
throw new IllegalStateException("Cannot schedule " +
taskNode.getReportingTask().getIdentifier() + " because it is already scheduled
to run");
}
@@ -61,7 +67,6 @@ public class QuartzSchedulingAgent extends
AbstractTimeBasedSchedulingAgent {
final ReportingTaskWrapper taskWrapper = new
ReportingTaskWrapper(taskNode, scheduleState,
flowController.getExtensionManager());
- final AtomicBoolean canceled = new AtomicBoolean(false);
final Date initialDate = cronExpression.getTimeAfter(new Date());
final long initialDelay = initialDate.getTime() -
System.currentTimeMillis();
@@ -71,37 +76,30 @@ public class QuartzSchedulingAgent extends
AbstractTimeBasedSchedulingAgent {
@Override
public void run() {
- if (canceled.get()) {
- return;
- }
-
taskWrapper.run();
- if (canceled.get()) {
- return;
- }
-
nextSchedule = getNextSchedule(nextSchedule, cronExpression);
final long delay = getDelay(nextSchedule);
logger.debug("Finished running Reporting Task {}; next
scheduled time is at {} after a delay of {} milliseconds", taskNode,
nextSchedule, delay);
- flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
+ final ScheduledFuture<?> newFuture = flowEngine.schedule(this,
delay, TimeUnit.MILLISECONDS);
+ final ScheduledFuture<?> oldFuture =
componentFuturesMap.put(0, newFuture);
+ scheduleState.replaceFuture(oldFuture, newFuture);
}
};
- final List<AtomicBoolean> triggers = new ArrayList<>(1);
- triggers.add(canceled);
- canceledTriggers.put(taskNode, triggers);
+ final ScheduledFuture<?> future = flowEngine.schedule(command,
initialDelay, TimeUnit.MILLISECONDS);
+ componentFuturesMap.put(0, future);
- flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
scheduleState.setScheduled(true);
+ scheduleState.setFutures(componentFuturesMap.values());
logger.info("Scheduled Reporting Task {} to run threads on schedule
{}", taskNode, cronSchedule);
}
@Override
public synchronized void doSchedule(final Connectable connectable, final
LifecycleState scheduleState) {
- final List<AtomicBoolean> existingTriggers =
canceledTriggers.get(connectable);
- if (existingTriggers != null) {
+ final Map<Integer, ScheduledFuture<?>> componentFuturesMap =
quartzFutures.computeIfAbsent(connectable, k -> new HashMap<>());
+ if (!componentFuturesMap.values().isEmpty()) {
throw new IllegalStateException("Cannot schedule " + connectable +
" because it is already scheduled to run");
}
@@ -114,11 +112,10 @@ public class QuartzSchedulingAgent extends
AbstractTimeBasedSchedulingAgent {
throw new IllegalStateException("Cannot schedule " + connectable +
" to run because its scheduling period is not valid");
}
- final List<AtomicBoolean> triggers = new ArrayList<>();
for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
final ConnectableTask continuallyRunTask = new
ConnectableTask(this, connectable, flowController, contextFactory,
scheduleState);
- final AtomicBoolean canceled = new AtomicBoolean(false);
+ final AtomicInteger taskNumber = new AtomicInteger(i);
final Date initialDate = cronExpression.getTimeAfter(new Date());
final long initialDelay = initialDate.getTime() -
System.currentTimeMillis();
@@ -129,10 +126,6 @@ public class QuartzSchedulingAgent extends
AbstractTimeBasedSchedulingAgent {
@Override
public void run() {
- if (canceled.get()) {
- return;
- }
-
try {
continuallyRunTask.invoke();
} catch (final RuntimeException re) {
@@ -141,24 +134,21 @@ public class QuartzSchedulingAgent extends
AbstractTimeBasedSchedulingAgent {
throw new ProcessException(e);
}
- if (canceled.get()) {
- return;
- }
-
nextSchedule = getNextSchedule(nextSchedule,
cronExpression);
final long delay = getDelay(nextSchedule);
logger.debug("Finished task for {}; next scheduled time is
at {} after a delay of {} milliseconds", connectable, nextSchedule, delay);
- flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
+ final ScheduledFuture<?> newFuture =
flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
+ final ScheduledFuture<?> oldFuture =
componentFuturesMap.put(taskNumber.get(), newFuture);
+ scheduleState.replaceFuture(oldFuture, newFuture);
}
};
-
- flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
- triggers.add(canceled);
+ final ScheduledFuture<?> future = flowEngine.schedule(command,
initialDelay, TimeUnit.MILLISECONDS);
+ componentFuturesMap.put(taskNumber.get(), future);
}
- canceledTriggers.put(connectable, triggers);
+ scheduleState.setFutures(componentFuturesMap.values());
logger.info("Scheduled {} to run with {} threads on schedule {}",
connectable, connectable.getMaxConcurrentTasks(), cronSchedule);
}
@@ -173,12 +163,13 @@ public class QuartzSchedulingAgent extends
AbstractTimeBasedSchedulingAgent {
}
private void unschedule(final Object scheduled, final LifecycleState
scheduleState) {
- final List<AtomicBoolean> triggers =
canceledTriggers.remove(scheduled);
- if (triggers != null) {
- for (final AtomicBoolean trigger : triggers) {
- trigger.set(true);
+ quartzFutures.remove(scheduled);
+ scheduleState.getFutures().forEach(future -> {
+ if (!future.isCancelled()) {
+ // stop scheduling to run but do not interrupt currently
running tasks.
+ future.cancel(false);
}
- }
+ });
scheduleState.setScheduled(false);
logger.info("Stopped scheduling {} to run", scheduled);