This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 08edc33 NIFI-8629: Implemented the LogComponentStatuses task that
runs periodically in stateless.
08edc33 is described below
commit 08edc33eb7eb87c3790b7512948bdec5ed58652f
Author: Mark Payne <[email protected]>
AuthorDate: Tue May 25 08:27:25 2021 -0400
NIFI-8629: Implemented the LogComponentStatuses task that runs periodically
in stateless.
Also noticed a typo in the ControllerStatusReportingTask and found in
comparing outputs
that it had a bug that caused it to log counters generated only by
processors at the root level so fixed that.
This closes #5101
Signed-off-by: David Handermann <[email protected]>
---
.../controller/ControllerStatusReportingTask.java | 6 +-
.../controller/reporting/LogComponentStatuses.java | 205 +++++++++++++++++++++
.../stateless/engine/StandardStatelessEngine.java | 20 ++
.../nifi/stateless/engine/StatelessEngine.java | 3 +
.../flow/StandardStatelessDataflowFactory.java | 4 +-
.../nifi/stateless/flow/StandardStatelessFlow.java | 67 ++++++-
6 files changed, 293 insertions(+), 12 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java
index 9f61e38..a196c4f 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/controller/ControllerStatusReportingTask.java
@@ -119,7 +119,7 @@ public class ControllerStatusReportingTask extends
AbstractReportingTask {
"Flow Files Out", "Bytes Read", "Bytes Written", "Tasks",
"Proc Time");
processorBorderLine = createLine(processorHeader);
- counterHeader = String.format(COUNTER_LINE_FORMAT, "Context Context",
"Counter Name", "Counter Value");
+ counterHeader = String.format(COUNTER_LINE_FORMAT, "Counter Context",
"Counter Name", "Counter Value");
counterBorderLine = createLine(counterHeader);
}
@@ -228,6 +228,10 @@ public class ControllerStatusReportingTask extends
AbstractReportingTask {
}
}
}
+
+ for (final ProcessGroupStatus childGroupStatus :
status.getProcessGroupStatus()) {
+ printCounterStatus(childGroupStatus, builder, showDeltas, divisor);
+ }
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
new file mode 100644
index 0000000..59155dc
--- /dev/null
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/LogComponentStatuses.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.reporting;
+
+import org.apache.nifi.controller.Counter;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.repository.CounterRepository;
+import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class LogComponentStatuses implements Runnable {
+ private static final Logger logger =
LoggerFactory.getLogger(LogComponentStatuses.class);
+ private static final int METRIC_CACHE_SECONDS = 300; // FlowFileEvent
Repository holds 300 seconds' worth of metrics/events
+
+ private static final String PROCESSOR_LINE_FORMAT = "| %1$-30.30s |
%2$-36.36s | %3$-30.30s | %4$28.28s | %5$30.30s | %6$14.14s | %714.14s |
%8$28.28s |\n";
+ private static final String COUNTER_LINE_FORMAT = "| %1$-36.36s |
%2$-36.36s | %3$28.28s | %4$28.28s |\n";
+
+ private final FlowFileEventRepository flowFileEventRepository;
+ private final CounterRepository counterRepository;
+ private final FlowManager flowManager;
+
+ private final String processorHeader;
+ private final String processorBorderLine;
+ private final String counterHeader;
+ private final String counterBorderLine;
+
+ private final Map<String, Long> previousCounterValues = new
ConcurrentHashMap<>();
+ private volatile long lastTriggerTime = System.currentTimeMillis();
+
+ public LogComponentStatuses(final FlowFileEventRepository
flowFileEventRepository, final CounterRepository counterRepository, final
FlowManager flowManager) {
+ this.flowFileEventRepository = flowFileEventRepository;
+ this.counterRepository = counterRepository;
+ this.flowManager = flowManager;
+
+ processorHeader = String.format(PROCESSOR_LINE_FORMAT, "Processor
Name", "Processor ID", "Processor Type", "Bytes Read/sec", "Bytes Written/sec",
"Tasks/sec", "Nanos/Task",
+ "Percent of Processing Time");
+ processorBorderLine = createLine(processorHeader);
+
+ counterHeader = String.format(COUNTER_LINE_FORMAT, "Counter Context",
"Counter Name", "Counter Value", "Increase/sec");
+ counterBorderLine = createLine(counterHeader);
+ }
+
+ private String createLine(final String valueToUnderscore) {
+ final StringBuilder processorBorderBuilder = new
StringBuilder(valueToUnderscore.length());
+ for (int i = 0; i < valueToUnderscore.length(); i++) {
+ processorBorderBuilder.append('-');
+ }
+ return processorBorderBuilder.toString();
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (!logger.isInfoEnabled()) {
+ return;
+ }
+
+ logFlowFileEvents();
+ logCounters();
+ } catch (final Exception e) {
+ logger.error("Failed to log component statuses", e);
+ }
+ }
+
+ private void logFlowFileEvents() {
+ final long timestamp = System.currentTimeMillis();
+ final ProcessGroup rootGroup = flowManager.getRootGroup();
+ final List<ProcessorNode> allProcessors =
rootGroup.findAllProcessors();
+
+ long totalNanos = 0L;
+ final List<ProcessorAndEvent> processorsAndEvents = new ArrayList<>();
+ for (final ProcessorNode processorNode : allProcessors) {
+ final FlowFileEvent flowFileEvent =
flowFileEventRepository.reportTransferEvents(processorNode.getIdentifier(),
timestamp);
+ if (flowFileEvent == null) {
+ continue;
+ }
+
+ processorsAndEvents.add(new ProcessorAndEvent(processorNode,
flowFileEvent));
+ totalNanos += flowFileEvent.getProcessingNanoseconds();
+ }
+
+ final Comparator<ProcessorAndEvent> comparator =
Comparator.comparing(procAndEvent ->
procAndEvent.getEvent().getProcessingNanoseconds());
+ processorsAndEvents.sort(comparator.reversed());
+
+ final StringBuilder builder = new StringBuilder();
+ builder.append("Processor Statuses:\n");
+ builder.append(processorBorderLine);
+ builder.append("\n");
+ builder.append(processorHeader);
+ builder.append(processorBorderLine);
+ builder.append("\n");
+
+ for (final ProcessorAndEvent processorAndEvent : processorsAndEvents) {
+ addStatus(processorAndEvent, builder, METRIC_CACHE_SECONDS,
totalNanos);
+ }
+
+ builder.append(processorBorderLine);
+ logger.info(builder.toString());
+ }
+
+ private void addStatus(final ProcessorAndEvent processorAndEvent, final
StringBuilder builder, final int secondsInEvent, final long totalNanos) {
+ final ProcessorNode processorNode =
processorAndEvent.getProcessorNode();
+ final FlowFileEvent flowFileEvent = processorAndEvent.getEvent();
+
+ final long bytesReadPerSecond = flowFileEvent.getBytesRead() /
secondsInEvent;
+ final long bytesWrittenPerSecond = flowFileEvent.getBytesWritten() /
secondsInEvent;
+ final double invocations = (double) flowFileEvent.getInvocations() /
(double) secondsInEvent;
+ final long nanos = flowFileEvent.getProcessingNanoseconds();
+ final double nanosPer = (double) nanos / invocations;
+ final double nanosRatio = (double) nanos / (double) totalNanos;
+ final double processingPercent = nanosRatio * 100D;
+ final String processingPercentTwoDecimals = String.format("%.2f %%",
processingPercent);
+
+ final String bytesRead =
FormatUtils.formatDataSize(bytesReadPerSecond);
+ final String bytesWritten =
FormatUtils.formatDataSize(bytesWrittenPerSecond);
+ final String invocationsPerSec = String.format("%.2f", invocations);
+ final String nanosPerInvocation = String.format("%.2f", nanosPer);
+
+ builder.append(String.format(PROCESSOR_LINE_FORMAT,
+ processorNode.getName(),
+ processorNode.getIdentifier(),
+ processorNode.getComponentType(),
+ bytesRead,
+ bytesWritten,
+ invocationsPerSec,
+ nanosPerInvocation,
+ processingPercentTwoDecimals));
+ }
+
+ private void logCounters() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("Counters:\n");
+ builder.append(counterBorderLine);
+ builder.append("\n");
+ builder.append(counterHeader);
+ builder.append(counterBorderLine);
+ builder.append("\n");
+
+ final long now = System.currentTimeMillis();
+ final long millisSinceLastTrigger = now - lastTriggerTime;
+ final double secondsSinceLastTrigger = (double) millisSinceLastTrigger
/ 1000D;
+ lastTriggerTime = now;
+
+ final List<Counter> counters = counterRepository.getCounters();
+
counters.sort(Comparator.comparing(Counter::getContext).thenComparing(Counter::getName));
+
+ for (final Counter counter : counters) {
+ final String counterId = counter.getIdentifier();
+ final long lastValue =
previousCounterValues.getOrDefault(counterId, 0L);
+ previousCounterValues.put(counterId, counter.getValue());
+ final long increaseSinceLast = counter.getValue() - lastValue;
+ final double increasePerSecond = (double) increaseSinceLast /
secondsSinceLastTrigger;
+ final String increase = String.format("%.2f", increasePerSecond);
+
+ builder.append(String.format(COUNTER_LINE_FORMAT,
counter.getContext(), counter.getName(), counter.getValue(), increase));
+ }
+
+ builder.append(counterBorderLine);
+ logger.info(builder.toString());
+ }
+
+ private static class ProcessorAndEvent {
+ private final ProcessorNode processorNode;
+ private final FlowFileEvent event;
+
+ public ProcessorAndEvent(final ProcessorNode processorNode, final
FlowFileEvent event) {
+ this.processorNode = processorNode;
+ this.event = event;
+ }
+
+ public ProcessorNode getProcessorNode() {
+ return processorNode;
+ }
+
+ public FlowFileEvent getEvent() {
+ return event;
+ }
+ }
+}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 6e47d82..8e2a024 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -31,6 +31,8 @@ import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.reporting.LogComponentStatuses;
+import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
@@ -73,6 +75,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@@ -93,6 +96,7 @@ public class StandardStatelessEngine implements
StatelessEngine<VersionedFlowSna
private final FlowFileEventRepository flowFileEventRepository;
private final ProvenanceRepository provenanceRepository;
private final ExtensionRepository extensionRepository;
+ private final CounterRepository counterRepository;
// Member Variables created/managed internally
private final ReloadComponent reloadComponent;
@@ -118,6 +122,7 @@ public class StandardStatelessEngine implements
StatelessEngine<VersionedFlowSna
this.flowFileEventRepository =
requireNonNull(builder.flowFileEventRepository, "FlowFile Event Repository must
be provided");
this.provenanceRepository =
requireNonNull(builder.provenanceRepository, "Provenance Repository must be
provided");
this.extensionRepository = requireNonNull(builder.extensionRepository,
"Extension Repository must be provided");
+ this.counterRepository = requireNonNull(builder.counterRepository,
"Counter Repository must be provided");
this.reloadComponent = new StatelessReloadComponent(this);
this.validationTrigger = new StandardValidationTrigger(new
FlowEngine(1, "Component Validation", true), () -> true);
@@ -170,6 +175,10 @@ public class StandardStatelessEngine implements
StatelessEngine<VersionedFlowSna
final List<ReportingTaskNode> reportingTaskNodes =
createReportingTasks(dataflowDefinition);
final StandardStatelessFlow dataflow = new
StandardStatelessFlow(childGroup, reportingTaskNodes,
controllerServiceProvider, processContextFactory,
repositoryContextFactory, dataflowDefinition,
stateManagerProvider, processScheduler);
+
+ final LogComponentStatuses logComponentStatuses = new
LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
+ dataflow.scheduleBackgroundTask(logComponentStatuses, 1,
TimeUnit.MINUTES);
+
return dataflow;
}
@@ -491,6 +500,11 @@ public class StandardStatelessEngine implements
StatelessEngine<VersionedFlowSna
return flowManager;
}
+ @Override
+ public CounterRepository getCounterRepository() {
+ return counterRepository;
+ }
+
public static class Builder {
private ExtensionManager extensionManager = null;
private BulletinRepository bulletinRepository = null;
@@ -503,6 +517,7 @@ public class StandardStatelessEngine implements
StatelessEngine<VersionedFlowSna
private FlowFileEventRepository flowFileEventRepository = null;
private ProvenanceRepository provenanceRepository = null;
private ExtensionRepository extensionRepository = null;
+ private CounterRepository counterRepository = null;
public Builder extensionManager(final ExtensionManager
extensionManager) {
this.extensionManager = extensionManager;
@@ -559,6 +574,11 @@ public class StandardStatelessEngine implements
StatelessEngine<VersionedFlowSna
return this;
}
+ public Builder counterRepository(final CounterRepository
counterRepository) {
+ this.counterRepository = counterRepository;
+ return this;
+ }
+
public StandardStatelessEngine build() {
return new StandardStatelessEngine(this);
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
index 827443f..ef16eec 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java
@@ -23,6 +23,7 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
@@ -68,4 +69,6 @@ public interface StatelessEngine<T> {
ProvenanceRepository getProvenanceRepository();
FlowFileEventRepository getFlowFileEventRepository();
+
+ CounterRepository getCounterRepository();
}
\ No newline at end of file
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index b0ecaa9..4ea17a6 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -171,6 +171,8 @@ public class StandardStatelessDataflowFactory implements
StatelessDataflowFactor
}
};
+ final CounterRepository counterRepo = new
StandardCounterRepository();
+
final File krb5File = engineConfiguration.getKrb5File();
final KerberosConfig kerberosConfig = new KerberosConfig(null,
null, krb5File);
logger.info("Setting java.security.krb5.conf to {}",
krb5File.getAbsolutePath());
@@ -188,6 +190,7 @@ public class StandardStatelessDataflowFactory implements
StatelessDataflowFactor
.flowFileEventRepository(flowFileEventRepo)
.provenanceRepository(provenanceRepo)
.extensionRepository(extensionRepository)
+ .counterRepository(counterRepo)
.build();
final StatelessFlowManager flowManager = new
StatelessFlowManager(flowFileEventRepo, parameterContextManager,
statelessEngine, () -> true, sslContext);
@@ -197,7 +200,6 @@ public class StandardStatelessDataflowFactory implements
StatelessDataflowFactor
final ProcessContextFactory processContextFactory = new
CachingProcessContextFactory(rawProcessContextFactory);
contentRepo = new ByteArrayContentRepository();
flowFileRepo = new StatelessFlowFileRepository();
- final CounterRepository counterRepo = new
StandardCounterRepository();
final RepositoryContextFactory repositoryContextFactory = new
StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo,
counterRepo, provenanceRepo, stateManagerProvider);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 4feb11c..e8d5d49 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -74,6 +74,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -97,8 +99,10 @@ public class StandardStatelessFlow implements
StatelessDataflow {
private final ProcessScheduler processScheduler;
private final AsynchronousCommitTracker tracker = new
AsynchronousCommitTracker();
private final TransactionThresholdMeter transactionThresholdMeter;
+ private final List<BackgroundTask> backgroundTasks = new ArrayList<>();
private volatile ExecutorService runDataflowExecutor;
+ private volatile ScheduledExecutorService backgroundTaskExecutor;
private volatile boolean initialized = false;
public StandardStatelessFlow(final ProcessGroup rootGroup, final
List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider
controllerServiceProvider,
@@ -212,23 +216,39 @@ public class StandardStatelessFlow implements
StatelessDataflow {
logger.info("Successfully initialized components in {} millis ({}
millis to perform validation, {} millis for services to enable)",
initializationMillis, validationMillis, serviceEnableMillis);
- runDataflowExecutor = Executors.newFixedThreadPool(1, r -> {
- final Thread thread =
Executors.defaultThreadFactory().newThread(r);
- final String flowName = dataflowDefinition.getFlowName();
- if (flowName == null) {
- thread.setName("Run Dataflow");
- } else {
- thread.setName("Run Dataflow " + flowName);
- }
+ // Create executor for dataflow
+ final String flowName = dataflowDefinition.getFlowName();
+ final String threadName = (flowName == null) ? "Run Dataflow" :
"Run Dataflow " + flowName;
+ runDataflowExecutor = Executors.newFixedThreadPool(1,
createNamedThreadFactory(threadName, false));
- return thread;
- });
+ // Periodically log component statuses
+ backgroundTaskExecutor = Executors.newScheduledThreadPool(1,
createNamedThreadFactory("Background Tasks", true));
+ backgroundTasks.forEach(task ->
backgroundTaskExecutor.scheduleWithFixedDelay(task.getTask(),
task.getSchedulingPeriod(), task.getSchedulingPeriod(),
task.getSchedulingUnit()));
} catch (final Throwable t) {
processScheduler.shutdown();
throw t;
}
}
+ private ThreadFactory createNamedThreadFactory(final String name, final
boolean daemon) {
+ return (Runnable r) -> {
+ final Thread thread =
Executors.defaultThreadFactory().newThread(r);
+ thread.setName(name);
+ thread.setDaemon(daemon);
+ return thread;
+ };
+ }
+
+ /**
+ * Schedules the given background task to run periodically after the
dataflow has been initialized until it has been shutdown
+ * @param task the task to run
+ * @param period how often to run it
+ * @param unit the unit for the time period
+ */
+ public void scheduleBackgroundTask(final Runnable task, final long period,
final TimeUnit unit) {
+ backgroundTasks.add(new BackgroundTask(task, period, unit));
+ }
+
private void waitForServicesEnabled(final ProcessGroup group) {
final long startTime = System.currentTimeMillis();
final long cutoff = startTime + COMPONENT_ENABLE_TIMEOUT_MILLIS;
@@ -268,6 +288,9 @@ public class StandardStatelessFlow implements
StatelessDataflow {
if (runDataflowExecutor != null) {
runDataflowExecutor.shutdown();
}
+ if (backgroundTaskExecutor != null) {
+ backgroundTaskExecutor.shutdown();
+ }
rootGroup.stopProcessing();
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
@@ -657,4 +680,28 @@ public class StandardStatelessFlow implements
StatelessDataflow {
this.stateValues = stateValues;
}
}
+
+ private static class BackgroundTask {
+ private final Runnable task;
+ private final long schedulingPeriod;
+ private final TimeUnit schedulingUnit;
+
+ public BackgroundTask(final Runnable task, final long
schedulingPeriod, final TimeUnit schedulingUnit) {
+ this.task = task;
+ this.schedulingPeriod = schedulingPeriod;
+ this.schedulingUnit = schedulingUnit;
+ }
+
+ public Runnable getTask() {
+ return task;
+ }
+
+ public long getSchedulingPeriod() {
+ return schedulingPeriod;
+ }
+
+ public TimeUnit getSchedulingUnit() {
+ return schedulingUnit;
+ }
+ }
}