This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a52b818fb9 NIFI-14655: Fixed Stateless Engine so that it is able to
surface CPU Time, GC Time, etc. in Processor Stats Do not shutdown thread pool
for stateless group until we've given processors a chance to gracefully stop;
also ensure that even if components support batch processing that we
immediately stop if the flow's desired state transitions to stopped
a52b818fb9 is described below
commit a52b818fb9d029dbf63a6145a7165c50f35797dd
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jun 12 15:04:08 2025 -0400
NIFI-14655: Fixed Stateless Engine so that it is able to surface CPU Time,
GC Time, etc. in Processor Stats
Do not shutdown thread pool for stateless group until we've given
processors a chance to gracefully stop; also ensure that even if components
support batch processing that we immediately stop if the flow's desired state
transitions to stopped
Signed-off-by: Joseph Witt <[email protected]>
This closes #10020
---
.../metrics/NanoTimePerformanceTracker.java | 14 +-
.../metrics/tracking/StandardStatsTracker.java | 125 ++++++++++++++++
.../repository/metrics/tracking/StatsTracker.java | 29 ++++
.../repository/metrics/tracking/TrackedStats.java | 43 ++++++
.../flow/StandardStatelessGroupNodeFactory.java | 6 +
.../nifi/controller/tasks/ConnectableTask.java | 162 ++-------------------
.../nifi/controller/tasks/StatelessFlowTask.java | 2 +-
.../nifi/groups/StandardStatelessGroupNode.java | 12 +-
.../scheduling/CronSchedulingAgentTest.java | 3 +
.../nifi/controller/tasks/TestConnectableTask.java | 4 +
.../stateless/engine/StandardStatelessEngine.java | 6 +-
.../nifi/stateless/flow/StandardStatelessFlow.java | 26 +++-
.../flow/StandardStatelessFlowCurrent.java | 69 +++++----
.../StatelessContentClaimWriteCache.java | 9 +-
.../repository/StatelessRepositoryContext.java | 6 +-
.../stateless/session/StatelessProcessSession.java | 22 +--
.../session/StatelessProcessSessionFactory.java | 12 +-
17 files changed, 339 insertions(+), 211 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NanoTimePerformanceTracker.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NanoTimePerformanceTracker.java
index 8e9c8cc762..386ae9be22 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NanoTimePerformanceTracker.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NanoTimePerformanceTracker.java
@@ -21,6 +21,7 @@ public class NanoTimePerformanceTracker implements
PerformanceTracker {
private final Timer contentReadTimer = new Timer();
private final Timer contentWriteTimer = new Timer();
private final Timer sessionCommitTimer = new Timer();
+ private int writeDepth = 0;
@Override
public void beginContentRead() {
@@ -39,12 +40,21 @@ public class NanoTimePerformanceTracker implements
PerformanceTracker {
@Override
public void beginContentWrite() {
- contentWriteTimer.start();
+ // Increase the write depth by 1 and start timer if the depth becomes
1.
+ // We do this because in many cases, we may call .beginContentWrite()
multiple times, then .endContentWrite().
+ // For example, if an OutputStream is used, calls to close() or
write() may then call flush(), so we don't want to
+ writeDepth++;
+ if (writeDepth == 1) {
+ contentWriteTimer.start();
+ }
}
@Override
public void endContentWrite() {
- contentWriteTimer.stop();
+ writeDepth--;
+ if (writeDepth == 0) {
+ contentWriteTimer.stop();
+ }
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StandardStatsTracker.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StandardStatsTracker.java
new file mode 100644
index 0000000000..d39e29e6bf
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StandardStatsTracker.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics.tracking;
+
+import
org.apache.nifi.controller.repository.metrics.NanoTimePerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
+
+public class StandardStatsTracker implements StatsTracker {
+ private static final ThreadMXBean threadMXBean =
ManagementFactory.getThreadMXBean();
+
+ private final LongSupplier gcMillisTracker;
+ private final LongSupplier cpuTimeMillisTracker;
+ private final int iterationsBetweenCpuTracking;
+
+ private final AtomicLong iterations = new AtomicLong(0L);
+ private final AtomicReference<SampledCpuMetrics> sampledCpuMetrics = new
AtomicReference<>(new SampledCpuMetrics(0L, 0L));
+
+ public StandardStatsTracker(final LongSupplier gcMillisTracker,
+ final int expensiveMetricsTrackingPercentage) {
+
+ if (expensiveMetricsTrackingPercentage < 0) {
+ throw new IllegalArgumentException("CPU tracking percentage must
be non-negative");
+ } else if (expensiveMetricsTrackingPercentage > 100) {
+ throw new IllegalArgumentException("CPU tracking percentage must
be less than or equal to 100");
+ }
+
+ if (expensiveMetricsTrackingPercentage == 0) {
+ this.iterationsBetweenCpuTracking = Integer.MAX_VALUE; // never
track CPU time
+ } else {
+ this.iterationsBetweenCpuTracking = 100 /
expensiveMetricsTrackingPercentage; // calculate iterations between CPU tracking
+ }
+
+ this.gcMillisTracker = gcMillisTracker;
+ this.cpuTimeMillisTracker =
threadMXBean.isCurrentThreadCpuTimeSupported() ?
threadMXBean::getCurrentThreadCpuTime : () -> 0L;
+ }
+
+ @Override
+ public TrackedStats startTracking() {
+ final long startTimeNanos = System.nanoTime();
+ final long startGcMillis = gcMillisTracker.getAsLong();
+ final boolean trackExpensiveMetrics = iterations.getAndIncrement() %
iterationsBetweenCpuTracking == 0;
+ final long startCpuTimeMillis = trackExpensiveMetrics ?
cpuTimeMillisTracker.getAsLong() : 0;
+ final PerformanceTracker performanceTracker = trackExpensiveMetrics ?
new NanoTimePerformanceTracker() : new NopPerformanceTracker();
+
+ return new TrackedStats() {
+ @Override
+ public StandardFlowFileEvent end() {
+ final long endCpuTimeMillis = trackExpensiveMetrics ?
cpuTimeMillisTracker.getAsLong() : 0;
+
+ final StandardFlowFileEvent event = new
StandardFlowFileEvent();
+ event.setProcessingNanos(System.nanoTime() - startTimeNanos);
+ event.setGarbageCollectionMillis(gcMillisTracker.getAsLong() -
startGcMillis);
+ event.setCpuNanoseconds(endCpuTimeMillis - startCpuTimeMillis);
+
event.setContentReadNanoseconds(performanceTracker.getContentReadNanos());
+
event.setContentWriteNanoseconds(performanceTracker.getContentWriteNanos());
+
event.setSessionCommitNanos(performanceTracker.getSessionCommitNanos());
+
+ if (trackExpensiveMetrics) {
+ addSampledCpuTime(event.getProcessingNanoseconds(),
event.getCpuNanoseconds());
+ } else {
+
event.setCpuNanoseconds(estimateCpuTime(event.getProcessingNanoseconds()));
+ }
+
+ return event;
+ }
+
+ @Override
+ public PerformanceTracker getPerformanceTracker() {
+ return performanceTracker;
+ }
+ };
+ }
+
+ private void addSampledCpuTime(final long processingNanoseconds, final
long cpuNanoseconds) {
+ boolean updated = false;
+ while (!updated) {
+ final SampledCpuMetrics existingMetrics = sampledCpuMetrics.get();
+ final SampledCpuMetrics newMetrics =
existingMetrics.add(processingNanoseconds, cpuNanoseconds);
+ updated = sampledCpuMetrics.compareAndSet(existingMetrics,
newMetrics);
+ }
+ }
+
+ private long estimateCpuTime(final long processingNanoseconds) {
+ final SampledCpuMetrics metrics = sampledCpuMetrics.get();
+ if (metrics.processingNanos() == 0) {
+ return 0; // Avoid division by zero
+ }
+
+ // Estimate CPU time based on the ratio of sampled CPU time to
processing time
+ return (processingNanoseconds * metrics.cpuNanos()) /
metrics.processingNanos();
+ }
+
+ private record SampledCpuMetrics(long processingNanos, long cpuNanos) {
+
+ SampledCpuMetrics add(long processingNanos, long cpuNanos) {
+ return new SampledCpuMetrics(
+ this.processingNanos + processingNanos,
+ this.cpuNanos + cpuNanos
+ );
+ }
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StatsTracker.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StatsTracker.java
new file mode 100644
index 0000000000..6d64530503
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StatsTracker.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics.tracking;
+
+public interface StatsTracker {
+
+ /**
+ * Starts tracking stats for a Processor, returning a TrackedStats object
that can be used to capture
+ * stats when it is ended.
+ * @return a TrackedStats that can be used to capture stats when it is
ended.
+ */
+ TrackedStats startTracking();
+
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/TrackedStats.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/TrackedStats.java
new file mode 100644
index 0000000000..e9a56085dd
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/TrackedStats.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics.tracking;
+
+import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface TrackedStats {
+
+ /**
+ * Ends the tracking of stats and returns a StandardFlowFileEvent that
contains the stats.
+ * StandardFlowFileEvent is used here because it is often the case that we
want to populate additional
+ * fields in the FlowFileEvent, and the StandardFlowFileEvent allows us to
do this.
+ *
+ * @return a StandardFlowFileEvent that contains the stats collected
during tracking.
+ */
+ StandardFlowFileEvent end();
+
+ /**
+ * Returns the PerformanceTracker associated with the TrackedStats so that
it may be provided to
+ * {@link ProcessSession} etc. to gather performance metrics that are
relevant.
+ *
+ * @return the PerformanceTracker associated with the TrackedStats
+ */
+ PerformanceTracker getPerformanceTracker();
+
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
index 3658b74763..41568ed9a2 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
@@ -28,6 +28,8 @@ import
org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.NonPurgeableContentRepository;
import org.apache.nifi.controller.repository.StatelessBridgeFlowFileRepository;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import
org.apache.nifi.controller.repository.metrics.tracking.StandardStatsTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
import org.apache.nifi.controller.scheduling.StatelessProcessScheduler;
import
org.apache.nifi.controller.scheduling.StatelessProcessSchedulerInitializationContext;
import org.apache.nifi.engine.FlowEngine;
@@ -151,6 +153,9 @@ public class StandardStatelessGroupNodeFactory implements
StatelessGroupNodeFact
}
};
+ final StatsTracker statsTracker = new
StandardStatsTracker(flowController.getGarbageCollectionLog()::getTotalGarbageCollectionMillis,
+ flowController.getPerformanceTrackingPercentage());
+
final LogRepository logRepository =
LogRepositoryFactory.getRepository(group.getIdentifier());
final StatelessGroupNode statelessGroupNode = new
StandardStatelessGroupNode.Builder()
.rootGroup(group)
@@ -165,6 +170,7 @@ public class StandardStatelessGroupNodeFactory implements
StatelessGroupNodeFact
.bulletinRepository(flowController.getBulletinRepository())
.statelessGroupFactory(statelessGroupFactory)
.lifecycleStateManager(flowController.getLifecycleStateManager())
+ .statsTracker(statsTracker)
.boredYieldDuration(flowController.getBoredYieldDuration(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)
.build();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 88841c3c73..2453415ec1 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -31,10 +31,10 @@ import
org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory;
-import
org.apache.nifi.controller.repository.metrics.NanoTimePerformanceTracker;
-import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
-import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
+import
org.apache.nifi.controller.repository.metrics.tracking.StandardStatsTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.TrackedStats;
import
org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
@@ -53,8 +53,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -73,10 +71,7 @@ public class ConnectableTask {
private final ProcessContext processContext;
private final FlowController flowController;
private final int numRelationships;
- private final ThreadMXBean threadMXBean;
- private final AtomicLong invocations = new AtomicLong(0L);
- private volatile SampledMetrics sampledMetrics = new SampledMetrics();
- private final int perfTrackingNthIteration;
+ private final StatsTracker statsTracker;
public ConnectableTask(final SchedulingAgent schedulingAgent, final
Connectable connectable,
final FlowController flowController, final
RepositoryContextFactory contextFactory, final LifecycleState lifecycleState) {
@@ -86,7 +81,6 @@ public class ConnectableTask {
this.lifecycleState = lifecycleState;
this.numRelationships = connectable.getRelationships().size();
this.flowController = flowController;
- this.threadMXBean = ManagementFactory.getThreadMXBean();
final StateManager stateManager = new
TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()),
lifecycleState::isTerminated);
if (connectable instanceof ProcessorNode) {
@@ -98,12 +92,8 @@ public class ConnectableTask {
repositoryContext = contextFactory.newProcessContext(connectable, new
AtomicLong(0L));
- final int perfTrackingPercentage =
flowController.getPerformanceTrackingPercentage();
- if (perfTrackingPercentage == 0) {
- perfTrackingNthIteration = 0;
- } else {
- perfTrackingNthIteration = 100 / perfTrackingPercentage;
- }
+ statsTracker = new
StandardStatsTracker(flowController.getGarbageCollectionLog()::getTotalGarbageCollectionMillis,
+ flowController.getPerformanceTrackingPercentage());
}
public Connectable getConnectable() {
@@ -197,33 +187,19 @@ public class ConnectableTask {
}
logger.debug("Triggering {}", connectable);
- final long totalInvocationCount = invocations.getAndIncrement();
-
- final boolean measureExpensiveMetrics =
isMeasureExpensiveMetrics(totalInvocationCount);
- final boolean measureCpuTime = measureExpensiveMetrics &&
threadMXBean.isCurrentThreadCpuTimeSupported();
- final long startCpuTime;
- final long startGcMillis;
- if (measureCpuTime) {
- startCpuTime = threadMXBean.getCurrentThreadCpuTime();
- startGcMillis =
flowController.getGarbageCollectionLog().getTotalGarbageCollectionMillis();
- } else {
- startCpuTime = 0L;
- startGcMillis = 0L;
- }
-
- final PerformanceTracker performanceTracker = measureExpensiveMetrics
? new NanoTimePerformanceTracker() : new NopPerformanceTracker();
+ final TrackedStats stats = statsTracker.startTracking();
final long batchNanos =
connectable.getRunDuration(TimeUnit.NANOSECONDS);
final ProcessSessionFactory sessionFactory;
final StandardProcessSession rawSession;
final boolean batch;
if (connectable.isSessionBatchingSupported() && batchNanos > 0L) {
- rawSession = new StandardProcessSession(repositoryContext,
lifecycleState::isTerminated, performanceTracker);
+ rawSession = new StandardProcessSession(repositoryContext,
lifecycleState::isTerminated, stats.getPerformanceTracker());
sessionFactory = new BatchingSessionFactory(rawSession);
batch = true;
} else {
rawSession = null;
- sessionFactory = new
StandardProcessSessionFactory(repositoryContext, lifecycleState::isTerminated,
performanceTracker);
+ sessionFactory = new
StandardProcessSessionFactory(repositoryContext, lifecycleState::isTerminated,
stats.getPerformanceTracker());
batch = false;
}
@@ -299,7 +275,7 @@ public class ConnectableTask {
}
try {
- updateEventRepo(startNanos, startCpuTime, startGcMillis,
invocationCount, measureCpuTime, performanceTracker);
+ updateEventRepo(stats, invocationCount);
} catch (final IOException e) {
logger.error("Unable to update FlowFileEvent Repository
for {}; statistics may be inaccurate.", connectable.getRunnableComponent(), e);
}
@@ -312,128 +288,14 @@ public class ConnectableTask {
return InvocationResult.DO_NOT_YIELD;
}
- private void updateEventRepo(final long startNanoTime, final long
startCpuTime, final long startGcMillis, final int invocationCount, final
boolean measureCpuTime,
- final PerformanceTracker performanceTracker)
- throws IOException {
- final long processingNanos = System.nanoTime() - startNanoTime;
- final StandardFlowFileEvent flowFileEvent = new
StandardFlowFileEvent();
- flowFileEvent.setProcessingNanos(processingNanos);
+ private void updateEventRepo(final TrackedStats stats, final int
invocationCount) throws IOException {
+ final StandardFlowFileEvent flowFileEvent = stats.end();
flowFileEvent.setInvocations(invocationCount);
-
- // We won't always measure CPU time because it's expensive to
calculate. So when we do measure it, we keep track of
- // total CPU nanos measured as well as total processing time for those
iterations. This gives us a ratio of CPU time vs. total time.
- // We can then use that to extrapolate an approximate CPU Time.
- if (measureCpuTime) {
- updatePerformanceTrackingMetrics(flowFileEvent,
performanceTracker, startCpuTime, startGcMillis, processingNanos);
- } else {
- estimatePerformanceTrackingMetrics(flowFileEvent, processingNanos);
- }
-
repositoryContext.getFlowFileEventRepository().updateRepository(flowFileEvent,
connectable.getIdentifier());
}
- private void estimatePerformanceTrackingMetrics(final
StandardFlowFileEvent flowFileEvent, final long processingNanos) {
- // Use ratio of measured CPU time vs. total time for that those
iterations, to estimate the CPU time for this iteration.
- final SampledMetrics currentMetrics = sampledMetrics;
- final double processingRatio = (double) processingNanos / (double)
Math.max(1, currentMetrics.getProcessingNanosSampled());
- flowFileEvent.setCpuNanoseconds((long) (processingRatio *
currentMetrics.getTotalCpuNanos()));
- flowFileEvent.setContentReadNanoseconds((long) (processingRatio *
currentMetrics.getReadNanos()));
- flowFileEvent.setContentWriteNanoseconds((long) (processingRatio *
currentMetrics.getWriteNanos()));
- flowFileEvent.setSessionCommitNanos((long) (processingRatio *
currentMetrics.getSessionCommitNanos()));
- flowFileEvent.setGarbageCollectionMillis((long) (processingRatio *
currentMetrics.getGcMillis()));
- }
-
- private void updatePerformanceTrackingMetrics(final StandardFlowFileEvent
flowFileEvent, final PerformanceTracker performanceTracker, final long
startCpuTime,
- final long startGcMillis,
final long processingNanos) {
- final long cpuTime = threadMXBean.getCurrentThreadCpuTime();
- final long cpuNanos = cpuTime - startCpuTime;
-
- final long endGcMillis =
flowController.getGarbageCollectionLog().getTotalGarbageCollectionMillis();
- final long gcMillis = endGcMillis - startGcMillis;
-
- flowFileEvent.setCpuNanoseconds(cpuNanos);
-
flowFileEvent.setContentWriteNanoseconds(performanceTracker.getContentWriteNanos());
-
flowFileEvent.setContentReadNanoseconds(performanceTracker.getContentReadNanos());
-
flowFileEvent.setSessionCommitNanos(performanceTracker.getSessionCommitNanos());
- flowFileEvent.setGarbageCollectionMillis(gcMillis);
-
- final SampledMetrics previousMetrics = sampledMetrics;
- final SampledMetrics updatedMetrics = new SampledMetrics();
-
updatedMetrics.setProcessingNanosSampled(previousMetrics.getProcessingNanosSampled()
+ processingNanos);
- updatedMetrics.setTotalCpuNanos(previousMetrics.getTotalCpuNanos() +
cpuNanos);
- updatedMetrics.setReadNanos(previousMetrics.getReadNanos() +
performanceTracker.getContentReadNanos());
- updatedMetrics.setWriteNanos(previousMetrics.getWriteNanos() +
performanceTracker.getContentWriteNanos());
-
updatedMetrics.setSessionCommitNanos(previousMetrics.getSessionCommitNanos() +
performanceTracker.getSessionCommitNanos());
- updatedMetrics.setGcMillis(gcMillis);
- this.sampledMetrics = updatedMetrics;
- }
-
- private boolean isMeasureExpensiveMetrics(final long invocationCount) {
- if (perfTrackingNthIteration == 0) { // A value of 0 indicates we
should never track performance metrics.
- return false;
- }
-
- return invocationCount % perfTrackingNthIteration == 0;
- }
-
private ComponentLog getComponentLog() {
return new SimpleProcessLogger(connectable.getIdentifier(),
connectable.getRunnableComponent(), new StandardLoggingContext(connectable));
}
- private static class SampledMetrics {
- private long processingNanosSampled = 0L;
- private long totalCpuNanos = 0L;
- private long readNanos = 0L;
- private long writeNanos = 0L;
- private long sessionCommitNanos = 0L;
- private long gcMillis = 0L;
-
- public long getProcessingNanosSampled() {
- return processingNanosSampled;
- }
-
- public void setProcessingNanosSampled(final long
processingNanosSampled) {
- this.processingNanosSampled = processingNanosSampled;
- }
-
- public long getTotalCpuNanos() {
- return totalCpuNanos;
- }
-
- public void setTotalCpuNanos(final long totalCpuNanos) {
- this.totalCpuNanos = totalCpuNanos;
- }
-
- public long getReadNanos() {
- return readNanos;
- }
-
- public void setReadNanos(final long readNanos) {
- this.readNanos = readNanos;
- }
-
- public long getWriteNanos() {
- return writeNanos;
- }
-
- public void setWriteNanos(final long writeNanos) {
- this.writeNanos = writeNanos;
- }
-
- public long getSessionCommitNanos() {
- return sessionCommitNanos;
- }
-
- public void setSessionCommitNanos(final long sessionCommitNanos) {
- this.sessionCommitNanos = sessionCommitNanos;
- }
-
- public long getGcMillis() {
- return gcMillis;
- }
-
- public void setGcMillis(final long gcMillis) {
- this.gcMillis = gcMillis;
- }
- }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
index 96d77aef75..5bdc42c52d 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
@@ -190,7 +190,7 @@ public class StatelessFlowTask {
try {
int invocationCount = 0;
- while ((invocationCount == 0 || allowBatch) &&
System.currentTimeMillis() < endTime) {
+ while ((invocationCount == 0 || allowBatch) &&
statelessGroupNode.getDesiredState() == ScheduledState.RUNNING &&
System.currentTimeMillis() < endTime) {
invocationCount++;
final Invocation invocation = new Invocation();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
index 6f0f96723d..96ff6c9610 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
@@ -35,6 +35,7 @@ import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
@@ -105,6 +106,7 @@ public class StandardStatelessGroupNode implements
StatelessGroupNode {
private final StatelessGroupFactory statelessGroupFactory;
private final LifecycleStateManager lifecycleStateManager;
private final FlowFileEventRepository flowFileEventRepository;
+ private final StatsTracker statsTracker;
private final long boredYieldMillis;
private volatile List<Connection> incomingConnections;
@@ -132,6 +134,7 @@ public class StandardStatelessGroupNode implements
StatelessGroupNode {
this.statelessGroupFactory = builder.statelessGroupFactory;
this.lifecycleStateManager = builder.lifecycleStateManager;
this.flowFileEventRepository = builder.flowFileEventRepository;
+ this.statsTracker = builder.statsTracker;
this.boredYieldMillis = builder.boredYieldMillis;
}
@@ -354,7 +357,8 @@ public class StandardStatelessGroupNode implements
StatelessGroupNode {
processScheduler,
bulletinRepository,
lifecycleStateManager,
- Duration.of(10, ChronoUnit.SECONDS));
+ Duration.of(10, ChronoUnit.SECONDS),
+ statsTracker);
// We don't want to enable Controller Services because we want to use
the actual Controller Services that exist within the
// Standard NiFi instance, not the ephemeral ones that created during
the initialization of the Stateless Group.
@@ -922,6 +926,7 @@ public class StandardStatelessGroupNode implements
StatelessGroupNode {
private StatelessGroupFactory statelessGroupFactory;
private LifecycleStateManager lifecycleStateManager;
private FlowFileEventRepository flowFileEventRepository;
+ private StatsTracker statsTracker;
private long boredYieldMillis = 10L;
public Builder rootGroup(final ProcessGroup rootGroup) {
@@ -984,6 +989,11 @@ public class StandardStatelessGroupNode implements
StatelessGroupNode {
return this;
}
+ public Builder statsTracker(final StatsTracker statsTracker) {
+ this.statsTracker = statsTracker;
+ return this;
+ }
+
public Builder boredYieldDuration(final long period, final TimeUnit
timeUnit) {
this.boredYieldMillis = TimeUnit.MILLISECONDS.convert(period,
timeUnit);
return this;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/CronSchedulingAgentTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/CronSchedulingAgentTest.java
index 5f0fef1ace..3b46865ae5 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/CronSchedulingAgentTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/CronSchedulingAgentTest.java
@@ -20,6 +20,7 @@ import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.GarbageCollectionLog;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.reporting.ReportingTask;
@@ -33,6 +34,7 @@ import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -86,6 +88,7 @@ class CronSchedulingAgentTest {
when(connectable.getIdentifier()).thenReturn(componentId);
when(flowController.getStateManagerProvider()).thenReturn(stateManagerProvider);
when(stateManagerProvider.getStateManager(eq(componentId))).thenReturn(stateManager);
+
when(flowController.getGarbageCollectionLog()).thenReturn(mock(GarbageCollectionLog.class));
schedulingAgent.doSchedule(connectable, lifecycleState);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
index 0e931f7497..51b0b91e31 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
@@ -23,6 +23,7 @@ import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.GarbageCollectionLog;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
@@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestConnectableTask {
@@ -56,6 +58,8 @@ public class TestConnectableTask {
final RepositoryContext repoContext =
Mockito.mock(StandardRepositoryContext.class);
Mockito.when(repoContext.getFlowFileEventRepository()).thenReturn(Mockito.mock(FlowFileEventRepository.class));
+
when(flowController.getGarbageCollectionLog()).thenReturn(mock(GarbageCollectionLog.class));
+
final RepositoryContextFactory contextFactory =
Mockito.mock(RepositoryContextFactory.class);
Mockito.when(contextFactory.newProcessContext(Mockito.any(Connectable.class),
Mockito.any(AtomicLong.class))).thenReturn(repoContext);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 63afae2adf..bfbb7e3467 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -40,6 +40,8 @@ import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.reporting.LogComponentStatuses;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import
org.apache.nifi.controller.repository.metrics.tracking.StandardStatsTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
@@ -198,8 +200,10 @@ public class StandardStatelessEngine implements
StatelessEngine {
overrideParameters(parameterContextMap, parameterValueProvider);
final List<ReportingTaskNode> reportingTaskNodes =
createReportingTasks(dataflowDefinition);
+ final StatsTracker statsTracker = new StandardStatsTracker(() -> 0, 0);
final StandardStatelessFlow dataflow = new
StandardStatelessFlow(childGroup, reportingTaskNodes,
controllerServiceProvider, processContextFactory,
- repositoryContextFactory, dataflowDefinition,
stateManagerProvider, processScheduler, bulletinRepository,
lifecycleStateManager, componentEnableTimeout);
+ repositoryContextFactory, dataflowDefinition,
stateManagerProvider, processScheduler, bulletinRepository,
lifecycleStateManager, componentEnableTimeout,
+ statsTracker);
if (statusTaskInterval != null) {
final LogComponentStatuses logComponentStatuses = new
LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index d94a7c25da..215a717d41 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -45,6 +45,7 @@ import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -129,6 +130,7 @@ public class StandardStatelessFlow implements
StatelessDataflow {
private final LifecycleStateManager lifecycleStateManager;
private final long componentEnableTimeoutMillis;
private final List<Port> inputPorts;
+ private final StatsTracker statsTracker;
private volatile ExecutorService runDataflowExecutor;
private volatile ScheduledExecutorService backgroundTaskExecutor;
@@ -139,7 +141,7 @@ public class StandardStatelessFlow implements
StatelessDataflow {
public StandardStatelessFlow(final ProcessGroup rootGroup, final
List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider
controllerServiceProvider,
final ProcessContextFactory
processContextFactory, final RepositoryContextFactory repositoryContextFactory,
final DataflowDefinition dataflowDefinition,
final StatelessStateManagerProvider
stateManagerProvider, final ProcessScheduler processScheduler, final
BulletinRepository bulletinRepository,
- final LifecycleStateManager
lifecycleStateManager, final Duration componentEnableTimeout) {
+ final LifecycleStateManager
lifecycleStateManager, final Duration componentEnableTimeout, final
StatsTracker statsTracker) {
this.rootGroup = rootGroup;
this.allConnections = rootGroup.findAllConnections();
this.reportingTasks = reportingTasks;
@@ -155,6 +157,7 @@ public class StandardStatelessFlow implements
StatelessDataflow {
this.tracker = new AsynchronousCommitTracker(rootGroup);
this.lifecycleStateManager = lifecycleStateManager;
this.inputPorts = new ArrayList<>(rootGroup.getInputPorts());
+ this.statsTracker = statsTracker;
rootConnectables = new HashSet<>();
inputPortsByName = mapInputPortsToName(rootGroup);
@@ -370,9 +373,6 @@ public class StandardStatelessFlow implements
StatelessDataflow {
shutdown = true;
logger.info("Shutting down dataflow {}", rootGroup.getName());
- if (runDataflowExecutor != null) {
- runDataflowExecutor.shutdown();
- }
if (backgroundTaskExecutor != null) {
backgroundTaskExecutor.shutdown();
}
@@ -384,11 +384,12 @@ public class StandardStatelessFlow implements
StatelessDataflow {
// Wait for the graceful shutdown period for all processors to stop.
If the processors do not stop within this time,
// then interrupt them.
- if (runDataflowExecutor != null && interruptProcessors) {
+ boolean interrupt = false;
+ if (interruptProcessors) {
if (gracefulShutdownDuration.isZero()) {
logger.info("Shutting down all components immediately without
waiting for graceful shutdown period");
tracker.triggerFailureCallbacks(new TerminatedTaskException());
- runDataflowExecutor.shutdownNow();
+ interrupt = true;
} else {
final boolean gracefullyStopped =
waitForProcessorThreadsToComplete(runningProcessors, gracefulShutdownDuration);
if (gracefullyStopped) {
@@ -398,14 +399,22 @@ public class StandardStatelessFlow implements
StatelessDataflow {
logger.warn("{} Processors did not finish running within
the graceful shutdown period of {} millis. Interrupting all running components.
Processors still running: {}",
runningProcessors.size(),
gracefulShutdownDuration.toMillis(), runningProcessors);
tracker.triggerFailureCallbacks(new
TerminatedTaskException());
- runDataflowExecutor.shutdownNow();
+ interrupt = true;
}
}
- } else if (runDataflowExecutor != null) {
+ } else {
waitForProcessorThreadsToComplete(runningProcessors,
gracefulShutdownDuration);
tracker.triggerCallbacks();
}
+ if (runDataflowExecutor != null) {
+ if (interrupt) {
+ runDataflowExecutor.shutdownNow();
+ } else {
+ runDataflowExecutor.shutdown();
+ }
+ }
+
// Stop components but do not trigger @OnUnscheduled because those
were already triggered.
final CompletableFuture<Void> stopFuture =
rootGroup.stopComponents(ProcessorStopLifecycleMethods.TRIGGER_ONSTOPPED);
try {
@@ -662,6 +671,7 @@ public class StandardStatelessFlow implements
StatelessDataflow {
final StatelessFlowCurrent current = new
StandardStatelessFlowCurrent.Builder()
.commitTracker(tracker)
+ .statsTracker(statsTracker)
.executionProgress(executionProgress)
.processContextFactory(processContextFactory)
.repositoryContextFactory(repositoryContextFactory)
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
index 3b0d998f19..a6912bff15 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
@@ -25,6 +25,8 @@ import
org.apache.nifi.connectable.ConnectionUtils.FlowFileCloneResult;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.TrackedStats;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.flowfile.FlowFile;
@@ -50,7 +52,7 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
private static final Logger logger =
LoggerFactory.getLogger(StandardStatelessFlowCurrent.class);
private final TransactionThresholdMeter transactionThresholdMeter;
- private final AsynchronousCommitTracker tracker;
+ private final AsynchronousCommitTracker commitTracker;
private final ExecutionProgress executionProgress;
private final Set<Connectable> rootConnectables;
private final FlowFileSupplier flowFileSupplier;
@@ -59,11 +61,12 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
private final RepositoryContextFactory repositoryContextFactory;
private final ProcessContextFactory processContextFactory;
private final LifecycleStateManager lifecycleStateManager;
+ private final StatsTracker statsTracker;
private StandardStatelessFlowCurrent(final Builder builder) {
this.transactionThresholdMeter = builder.transactionThresholdMeter;
- this.tracker = builder.tracker;
+ this.commitTracker = builder.commitTracker;
this.executionProgress = builder.executionProgress;
this.rootConnectables = builder.rootConnectables;
this.flowFileSupplier = builder.flowFileSupplier;
@@ -72,6 +75,7 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
this.repositoryContextFactory = builder.repositoryContextFactory;
this.processContextFactory = builder.processContextFactory;
this.lifecycleStateManager = builder.lifecycleStateManager;
+ this.statsTracker = builder.statsTracker;
}
@Override
@@ -81,8 +85,8 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
while (!completionReached) {
triggerRootConnectables();
- while (tracker.isAnyReady()) {
- final Connectable connectable = tracker.getNextReady();
+ while (commitTracker.isAnyReady()) {
+ final Connectable connectable =
commitTracker.getNextReady();
logger.debug("The next ready component to be triggered:
{}", connectable);
// Continually trigger the given component as long as it
is ready to be triggered
@@ -105,11 +109,11 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
// We have reached completion if the tracker does not know of
any components ready to be triggered AND
// we have no data queued in the flow (with the exception of
Output Ports).
- completionReached = !tracker.isAnyReady() &&
isFlowQueueEmpty();
+ completionReached = !commitTracker.isAnyReady() &&
isFlowQueueEmpty();
}
} catch (final Throwable t) {
executionProgress.notifyExecutionFailed(t);
- tracker.triggerFailureCallbacks(t);
+ commitTracker.triggerFailureCallbacks(t);
throw t;
}
}
@@ -196,7 +200,7 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
}
final Optional<FlowFile> flowFileOptional =
flowFileSupplier.getFlowFile(inputPort.getName());
- if (!flowFileOptional.isPresent()) {
+ if (flowFileOptional.isEmpty()) {
continue;
}
@@ -209,7 +213,7 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
cloneResult.distributeFlowFiles();
for (final Connection connection : outputConnections) {
- tracker.addConnectable(connection.getDestination());
+ commitTracker.addConnectable(connection.getDestination());
}
}
@@ -221,32 +225,35 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
// Reset progress and trigger the component. This allows us to track
whether or not any progress was made by the given connectable
// during this invocation of its onTrigger method.
- tracker.resetProgress();
+ commitTracker.resetProgress();
+ final TrackedStats trackedStats = statsTracker.startTracking();
final StatelessProcessSessionFactory statelessSessionFactory = new
StatelessProcessSessionFactory(connectable, repositoryContextFactory,
provenanceEventRepository, processContextFactory,
- executionProgress, false, tracker);
+ executionProgress, false, commitTracker,
trackedStats.getPerformanceTracker());
lifecycleState.incrementActiveThreadCount(null);
try {
- trigger(connectable, statelessSessionFactory);
+ trigger(connectable, statelessSessionFactory, trackedStats);
} finally {
lifecycleState.decrementActiveThreadCount();
+ registerProcessEvent(connectable, 1, trackedStats);
}
// Keep track of the output of the source component so that we can
determine whether or not we've reached our transaction threshold.
-
transactionThresholdMeter.incrementFlowFiles(tracker.getFlowFilesProduced());
- transactionThresholdMeter.incrementBytes(tracker.getBytesProduced());
+
transactionThresholdMeter.incrementFlowFiles(commitTracker.getFlowFilesProduced());
+
transactionThresholdMeter.incrementBytes(commitTracker.getBytesProduced());
}
private NextConnectable triggerWhileReady(final Connectable connectable) {
final LifecycleState lifecycleState =
lifecycleStateManager.getOrRegisterLifecycleState(connectable.getIdentifier(),
true, false);
+ final TrackedStats trackedStats = statsTracker.startTracking();
final StatelessProcessSessionFactory statelessSessionFactory = new
StatelessProcessSessionFactory(connectable, repositoryContextFactory,
provenanceEventRepository, processContextFactory,
- executionProgress, false, tracker);
+ executionProgress, false, commitTracker,
trackedStats.getPerformanceTracker());
lifecycleState.incrementActiveThreadCount(null);
try {
- while (tracker.isReady(connectable)) {
+ while (commitTracker.isReady(connectable)) {
if (executionProgress.isCanceled()) {
logger.info("Dataflow was canceled so will not trigger any
more components");
return NextConnectable.NONE;
@@ -254,12 +261,12 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
// Reset progress and trigger the component. This allows us to
track whether or not any progress was made by the given connectable
// during this invocation of its onTrigger method.
- tracker.resetProgress();
- trigger(connectable, statelessSessionFactory);
+ commitTracker.resetProgress();
+ trigger(connectable, statelessSessionFactory, trackedStats);
// Check if the component made any progress or not. If so,
continue on. If not, we need to check if providing the component with
// additional input would help the component to progress or
not.
- final boolean progressed = tracker.isProgress();
+ final boolean progressed = commitTracker.isProgress();
if (progressed) {
logger.debug("{} was triggered and made progress",
connectable);
continue;
@@ -292,27 +299,22 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
return NextConnectable.NEXT_READY;
} finally {
lifecycleState.decrementActiveThreadCount();
+ registerProcessEvent(connectable, 1, trackedStats);
}
}
- private void trigger(final Connectable connectable, final
ProcessSessionFactory sessionFactory) {
+ private void trigger(final Connectable connectable, final
ProcessSessionFactory sessionFactory, final TrackedStats trackedStats) {
final ProcessContext processContext =
processContextFactory.createProcessContext(connectable);
- final long start = System.nanoTime();
-
// Trigger component
logger.debug("Triggering {}", connectable);
connectable.onTrigger(processContext, sessionFactory);
-
- final long processingNanos = System.nanoTime() - start;
- registerProcessEvent(connectable, 1, processingNanos);
}
- private void registerProcessEvent(final Connectable connectable, final int
invocations, final long processingNanos) {
+ private void registerProcessEvent(final Connectable connectable, final int
invocations, final TrackedStats trackedStats) {
try {
- final StandardFlowFileEvent procEvent = new
StandardFlowFileEvent();
- procEvent.setProcessingNanos(processingNanos);
+ final StandardFlowFileEvent procEvent = trackedStats.end();
procEvent.setInvocations(invocations);
repositoryContextFactory.getFlowFileEventRepository().updateRepository(procEvent,
connectable.getIdentifier());
} catch (final IOException e) {
@@ -330,7 +332,7 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
public static class Builder {
private TransactionThresholdMeter transactionThresholdMeter;
- private AsynchronousCommitTracker tracker;
+ private AsynchronousCommitTracker commitTracker;
private ExecutionProgress executionProgress;
private Set<Connectable> rootConnectables;
private Collection<Port> inputPorts;
@@ -339,15 +341,17 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
private RepositoryContextFactory repositoryContextFactory;
private ProcessContextFactory processContextFactory;
private LifecycleStateManager lifecycleStateManager;
+ private StatsTracker statsTracker;
public StandardStatelessFlowCurrent build() {
Objects.requireNonNull(transactionThresholdMeter, "Transaction
Threshold Meter must be set");
- Objects.requireNonNull(tracker, "Commit Tracker must be set");
+ Objects.requireNonNull(commitTracker, "Commit Tracker must be
set");
Objects.requireNonNull(executionProgress, "Execution Progress must
be set");
Objects.requireNonNull(rootConnectables, "Root Conectables must be
set");
Objects.requireNonNull(repositoryContextFactory, "Repository
Context Factory must be set");
Objects.requireNonNull(provenanceEventRepository, "Provenance
Event Repository must be set");
Objects.requireNonNull(processContextFactory, "Process Context
Factory must be set");
+ Objects.requireNonNull(statsTracker, "Stats Tracker must be set");
return new StandardStatelessFlowCurrent(this);
}
@@ -363,7 +367,12 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
}
public Builder commitTracker(final AsynchronousCommitTracker
commitTracker) {
- this.tracker = commitTracker;
+ this.commitTracker = commitTracker;
+ return this;
+ }
+
+ public Builder statsTracker(final StatsTracker statsTracker) {
+ this.statsTracker = statsTracker;
return this;
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessContentClaimWriteCache.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessContentClaimWriteCache.java
index 68184e9e57..cd42e160d2 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessContentClaimWriteCache.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessContentClaimWriteCache.java
@@ -21,6 +21,8 @@ import
org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
+import
org.apache.nifi.controller.repository.metrics.PerformanceTrackingOutputStream;
import org.apache.nifi.processor.exception.ProcessException;
import java.io.IOException;
@@ -30,10 +32,12 @@ import java.util.List;
public class StatelessContentClaimWriteCache implements ContentClaimWriteCache
{
private final ContentRepository contentRepository;
+ private final PerformanceTracker performanceTracker;
private final List<OutputStream> writtenTo = new ArrayList<>();
- public StatelessContentClaimWriteCache(final ContentRepository
contentRepository) {
+ public StatelessContentClaimWriteCache(final ContentRepository
contentRepository, final PerformanceTracker performanceTracker) {
this.contentRepository = contentRepository;
+ this.performanceTracker = performanceTracker;
}
@Override
@@ -56,7 +60,8 @@ public class StatelessContentClaimWriteCache implements
ContentClaimWriteCache {
@Override
public OutputStream write(final ContentClaim claim) throws IOException {
- final OutputStream out = contentRepository.write(claim);
+ final OutputStream rawOut = contentRepository.write(claim);
+ final OutputStream out = new PerformanceTrackingOutputStream(rawOut,
performanceTracker);
writtenTo.add(out);
return out;
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
index 841c029ddf..a354ddec95 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
@@ -32,17 +32,17 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
import java.util.concurrent.atomic.AtomicLong;
public class StatelessRepositoryContext extends AbstractRepositoryContext
implements RepositoryContext {
- private final ContentClaimWriteCache writeCache;
+ private final ContentRepository contentRepository;
public StatelessRepositoryContext(final Connectable connectable, final
AtomicLong connectionIndex, final ContentRepository contentRepository, final
FlowFileRepository flowFileRepository,
final FlowFileEventRepository
flowFileEventRepository, final CounterRepository counterRepository, final
ProvenanceEventRepository provenanceRepository,
final StateManager stateManager) {
super(connectable, connectionIndex, contentRepository,
flowFileRepository, flowFileEventRepository, counterRepository,
provenanceRepository, stateManager);
- writeCache = new StatelessContentClaimWriteCache(contentRepository);
+ this.contentRepository = contentRepository;
}
@Override
public ContentClaimWriteCache createContentClaimWriteCache(final
PerformanceTracker performanceTracker) {
- return writeCache;
+ return new StatelessContentClaimWriteCache(contentRepository,
performanceTracker);
}
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
index 7695043f36..566831b4ba 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
@@ -24,7 +24,7 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.StandardProcessSession;
-import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
@@ -53,22 +53,25 @@ public class StatelessProcessSession extends
StandardProcessSession {
private final ProcessContextFactory processContextFactory;
private final ProvenanceEventRepository provenanceEventRepository;
private final ExecutionProgress executionProgress;
- private final AsynchronousCommitTracker tracker;
+ private final AsynchronousCommitTracker commitTracker;
+ private final PerformanceTracker performanceTracker;
private boolean requireSynchronousCommits;
public StatelessProcessSession(final Connectable connectable, final
RepositoryContextFactory repositoryContextFactory,
final ProvenanceEventRepository
provenanceEventRepository, final ProcessContextFactory processContextFactory,
- final ExecutionProgress progress, final
boolean requireSynchronousCommits, final AsynchronousCommitTracker tracker) {
+ final ExecutionProgress progress, final
boolean requireSynchronousCommits, final AsynchronousCommitTracker
commitTracker,
+ final PerformanceTracker
performanceTracker) {
- super(repositoryContextFactory.createRepositoryContext(connectable,
provenanceEventRepository), progress::isCanceled, new NopPerformanceTracker());
+ super(repositoryContextFactory.createRepositoryContext(connectable,
provenanceEventRepository), progress::isCanceled, performanceTracker);
this.connectable = connectable;
this.repositoryContextFactory = repositoryContextFactory;
this.provenanceEventRepository = provenanceEventRepository;
this.processContextFactory = processContextFactory;
this.executionProgress = progress;
this.requireSynchronousCommits = requireSynchronousCommits;
- this.tracker = tracker;
+ this.commitTracker = commitTracker;
+ this.performanceTracker = performanceTracker;
}
@Override
@@ -94,7 +97,7 @@ public class StatelessProcessSession extends
StandardProcessSession {
// If we don't require synchronous commits, we can trigger the async
commit, but we can't call the callback yet, because we only can call the
success callback when we've completed the
// dataflow in order to ensure that we don't destroy data in a way
that it can't be replayed if the downstream processors fail.
if (!requireSynchronousCommits) {
- tracker.addCallback(connectable, onSuccess, onFailure, this);
+ commitTracker.addCallback(connectable, onSuccess, onFailure, this);
super.commitAsync();
return;
}
@@ -132,7 +135,7 @@ public class StatelessProcessSession extends
StandardProcessSession {
// Check if the Processor made any progress or not. If so, record this
fact so that the framework knows that this was the case.
final int flowFileCounts = checkpoint.getFlowFilesIn() +
checkpoint.getFlowFilesOut() + checkpoint.getFlowFilesRemoved();
if (flowFileCounts > 0) {
- tracker.recordProgress(checkpoint.getFlowFilesOut() +
checkpoint.getFlowFilesRemoved(), checkpoint.getBytesOut() +
checkpoint.getBytesRemoved());
+ commitTracker.recordProgress(checkpoint.getFlowFilesOut() +
checkpoint.getFlowFilesRemoved(), checkpoint.getBytesOut() +
checkpoint.getBytesRemoved());
}
// Commit the session
@@ -216,7 +219,7 @@ public class StatelessProcessSession extends
StandardProcessSession {
continue;
}
- tracker.addConnectable(connectable);
+ commitTracker.addConnectable(connectable);
}
}
@@ -252,7 +255,8 @@ public class StatelessProcessSession extends
StandardProcessSession {
final ProcessContext connectableContext =
processContextFactory.createProcessContext(connectable);
final ProcessSessionFactory connectableSessionFactory = new
StatelessProcessSessionFactory(connectable, repositoryContextFactory,
provenanceEventRepository,
- processContextFactory, executionProgress,
requireSynchronousCommits, new
AsynchronousCommitTracker(tracker.getRootGroup()));
+ processContextFactory, executionProgress,
requireSynchronousCommits,
+ new AsynchronousCommitTracker(commitTracker.getRootGroup()),
performanceTracker);
logger.debug("Triggering {}", connectable);
final long start = System.nanoTime();
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSessionFactory.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSessionFactory.java
index 179cd35c28..056c699e94 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSessionFactory.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSessionFactory.java
@@ -18,6 +18,7 @@
package org.apache.nifi.stateless.session;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.provenance.ProvenanceEventRepository;
@@ -32,24 +33,27 @@ public class StatelessProcessSessionFactory implements
ProcessSessionFactory {
private final ProcessContextFactory processContextFactory;
private final ExecutionProgress executionProgress;
private final boolean requireSynchronousCommits;
- private final AsynchronousCommitTracker tracker;
+ private final AsynchronousCommitTracker commitTracker;
+ private final PerformanceTracker performanceTracker;
public StatelessProcessSessionFactory(final Connectable connectable, final
RepositoryContextFactory contextFactory,
final ProvenanceEventRepository
provenanceEventRepository, final ProcessContextFactory processContextFactory,
- final ExecutionProgress
executionProgress, final boolean requireSynchronousCommits, final
AsynchronousCommitTracker tracker) {
+ final ExecutionProgress
executionProgress, final boolean requireSynchronousCommits, final
AsynchronousCommitTracker commitTracker,
+ final PerformanceTracker
performanceTracker) {
this.connectable = connectable;
this.contextFactory = contextFactory;
this.provenanceEventRepository = provenanceEventRepository;
this.processContextFactory = processContextFactory;
this.executionProgress = executionProgress;
this.requireSynchronousCommits = requireSynchronousCommits;
- this.tracker = tracker;
+ this.commitTracker = commitTracker;
+ this.performanceTracker = performanceTracker;
}
@Override
public ProcessSession createSession() {
final StatelessProcessSession session = new
StatelessProcessSession(connectable, contextFactory, provenanceEventRepository,
processContextFactory, executionProgress,
- requireSynchronousCommits, tracker);
+ requireSynchronousCommits, commitTracker, performanceTracker);
executionProgress.registerCreatedSession(session);
return session;
}