markap14 commented on a change in pull request #5101:
URL: https://github.com/apache/nifi/pull/5101#discussion_r640014538
##########
File path:
nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
##########
@@ -212,23 +217,39 @@ public void initialize() {
logger.info("Successfully initialized components in {} millis ({}
millis to perform validation, {} millis for services to enable)",
initializationMillis, validationMillis, serviceEnableMillis);
- runDataflowExecutor = Executors.newFixedThreadPool(1, r -> {
- final Thread thread =
Executors.defaultThreadFactory().newThread(r);
- final String flowName = dataflowDefinition.getFlowName();
- if (flowName == null) {
- thread.setName("Run Dataflow");
- } else {
- thread.setName("Run Dataflow " + flowName);
- }
+ // Create executor for dataflow
+ final String flowName = dataflowDefinition.getFlowName();
+ final String threadName = (flowName == null) ? "Run Dataflow" :
"Run Dataflow " + flowName;
+ runDataflowExecutor = Executors.newFixedThreadPool(1,
createNamedThreadFactory(threadName, false));
- return thread;
- });
+ // Periodically log component statuses
+ backgroundTaskExecutor = Executors.newScheduledThreadPool(1,
createNamedThreadFactory("Background Tasks", true));
+ backgroundTasks.forEach(task ->
backgroundTaskExecutor.scheduleWithFixedDelay(task.getTask(),
task.getSchedulingPeriod(), task.getSchedulingPeriod(),
task.getSchedulingUnit()));
} catch (final Throwable t) {
processScheduler.shutdown();
throw t;
}
}
+ private ThreadFactory createNamedThreadFactory(final String name, final
boolean daemon) {
+ return (Runnable r) -> {
+ final Thread thread =
Executors.defaultThreadFactory().newThread(r);
+ thread.setName(name);
+ thread.setDaemon(daemon);
+ return thread;
+ };
+ }
+
+ /**
+ * Schedules the given background task to run periodically after the
dataflow has been initialized until it has been shutdown
+ * @param task the task to run
+ * @param period how often to run it
+ * @param unit the unit for the time period
+ */
+ public void scheduleBackgroundTask(final Runnable task, final long period,
final TimeUnit unit) {
Review comment:
This is intentionally kept in StandardStatelessFlow. It is something
that is made available to the framework (the instantiator of the
StandardStatelessFlow, specifically), definitely not something that we want
made publicly available.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]