This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a52b818fb9 NIFI-14655: Fixed Stateless Engine so that it is able to 
surface CPU Time, GC Time, etc. in Processor Stats Do not shutdown thread pool 
for stateless group until we've given processors a chance to gracefully stop; 
also ensure that even if components support batch processing that we 
immediately stop if the flow's desired state transitions to stopped
a52b818fb9 is described below

commit a52b818fb9d029dbf63a6145a7165c50f35797dd
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jun 12 15:04:08 2025 -0400

    NIFI-14655: Fixed Stateless Engine so that it is able to surface CPU Time, 
GC Time, etc. in Processor Stats
    Do not shutdown thread pool for stateless group until we've given 
processors a chance to gracefully stop; also ensure that even if components 
support batch processing that we immediately stop if the flow's desired state 
transitions to stopped
    
    Signed-off-by: Joseph Witt <[email protected]>
    
    This closes #10020
---
 .../metrics/NanoTimePerformanceTracker.java        |  14 +-
 .../metrics/tracking/StandardStatsTracker.java     | 125 ++++++++++++++++
 .../repository/metrics/tracking/StatsTracker.java  |  29 ++++
 .../repository/metrics/tracking/TrackedStats.java  |  43 ++++++
 .../flow/StandardStatelessGroupNodeFactory.java    |   6 +
 .../nifi/controller/tasks/ConnectableTask.java     | 162 ++-------------------
 .../nifi/controller/tasks/StatelessFlowTask.java   |   2 +-
 .../nifi/groups/StandardStatelessGroupNode.java    |  12 +-
 .../scheduling/CronSchedulingAgentTest.java        |   3 +
 .../nifi/controller/tasks/TestConnectableTask.java |   4 +
 .../stateless/engine/StandardStatelessEngine.java  |   6 +-
 .../nifi/stateless/flow/StandardStatelessFlow.java |  26 +++-
 .../flow/StandardStatelessFlowCurrent.java         |  69 +++++----
 .../StatelessContentClaimWriteCache.java           |   9 +-
 .../repository/StatelessRepositoryContext.java     |   6 +-
 .../stateless/session/StatelessProcessSession.java |  22 +--
 .../session/StatelessProcessSessionFactory.java    |  12 +-
 17 files changed, 339 insertions(+), 211 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NanoTimePerformanceTracker.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NanoTimePerformanceTracker.java
index 8e9c8cc762..386ae9be22 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NanoTimePerformanceTracker.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NanoTimePerformanceTracker.java
@@ -21,6 +21,7 @@ public class NanoTimePerformanceTracker implements 
PerformanceTracker {
     private final Timer contentReadTimer = new Timer();
     private final Timer contentWriteTimer = new Timer();
     private final Timer sessionCommitTimer = new Timer();
+    private int writeDepth = 0;
 
     @Override
     public void beginContentRead() {
@@ -39,12 +40,21 @@ public class NanoTimePerformanceTracker implements 
PerformanceTracker {
 
     @Override
     public void beginContentWrite() {
-        contentWriteTimer.start();
+        // Increase the write depth by 1 and start timer if the depth becomes 
1.
+        // We do this because in many cases, we may call .beginContentWrite() 
multiple times, then .endContentWrite().
+        // For example, if an OutputStream is used, calls to close() or 
write() may then call flush(), so we don't want to
+        writeDepth++;
+        if (writeDepth == 1) {
+            contentWriteTimer.start();
+        }
     }
 
     @Override
     public void endContentWrite() {
-        contentWriteTimer.stop();
+        writeDepth--;
+        if (writeDepth == 0) {
+            contentWriteTimer.stop();
+        }
     }
 
     @Override
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StandardStatsTracker.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StandardStatsTracker.java
new file mode 100644
index 0000000000..d39e29e6bf
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StandardStatsTracker.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics.tracking;
+
+import 
org.apache.nifi.controller.repository.metrics.NanoTimePerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
+
+public class StandardStatsTracker implements StatsTracker {
+    private static final ThreadMXBean threadMXBean = 
ManagementFactory.getThreadMXBean();
+
+    private final LongSupplier gcMillisTracker;
+    private final LongSupplier cpuTimeMillisTracker;
+    private final int iterationsBetweenCpuTracking;
+
+    private final AtomicLong iterations = new AtomicLong(0L);
+    private final AtomicReference<SampledCpuMetrics> sampledCpuMetrics = new 
AtomicReference<>(new SampledCpuMetrics(0L, 0L));
+
+    public StandardStatsTracker(final LongSupplier gcMillisTracker,
+                                final int expensiveMetricsTrackingPercentage) {
+
+        if (expensiveMetricsTrackingPercentage < 0) {
+            throw new IllegalArgumentException("CPU tracking percentage must 
be non-negative");
+        } else if (expensiveMetricsTrackingPercentage > 100) {
+            throw new IllegalArgumentException("CPU tracking percentage must 
be less than or equal to 100");
+        }
+
+        if (expensiveMetricsTrackingPercentage == 0) {
+            this.iterationsBetweenCpuTracking = Integer.MAX_VALUE; // never 
track CPU time
+        } else {
+            this.iterationsBetweenCpuTracking = 100 / 
expensiveMetricsTrackingPercentage; // calculate iterations between CPU tracking
+        }
+
+        this.gcMillisTracker = gcMillisTracker;
+        this.cpuTimeMillisTracker = 
threadMXBean.isCurrentThreadCpuTimeSupported() ? 
threadMXBean::getCurrentThreadCpuTime : () -> 0L;
+    }
+
+    @Override
+    public TrackedStats startTracking() {
+        final long startTimeNanos = System.nanoTime();
+        final long startGcMillis = gcMillisTracker.getAsLong();
+        final boolean trackExpensiveMetrics = iterations.getAndIncrement() % 
iterationsBetweenCpuTracking == 0;
+        final long startCpuTimeMillis = trackExpensiveMetrics ? 
cpuTimeMillisTracker.getAsLong() : 0;
+        final PerformanceTracker performanceTracker = trackExpensiveMetrics ? 
new NanoTimePerformanceTracker() : new NopPerformanceTracker();
+
+        return new TrackedStats() {
+            @Override
+            public StandardFlowFileEvent end() {
+                final long endCpuTimeMillis = trackExpensiveMetrics ? 
cpuTimeMillisTracker.getAsLong() : 0;
+
+                final StandardFlowFileEvent event = new 
StandardFlowFileEvent();
+                event.setProcessingNanos(System.nanoTime() - startTimeNanos);
+                event.setGarbageCollectionMillis(gcMillisTracker.getAsLong() - 
startGcMillis);
+                event.setCpuNanoseconds(endCpuTimeMillis - startCpuTimeMillis);
+                
event.setContentReadNanoseconds(performanceTracker.getContentReadNanos());
+                
event.setContentWriteNanoseconds(performanceTracker.getContentWriteNanos());
+                
event.setSessionCommitNanos(performanceTracker.getSessionCommitNanos());
+
+                if (trackExpensiveMetrics) {
+                    addSampledCpuTime(event.getProcessingNanoseconds(), 
event.getCpuNanoseconds());
+                } else {
+                    
event.setCpuNanoseconds(estimateCpuTime(event.getProcessingNanoseconds()));
+                }
+
+                return event;
+            }
+
+            @Override
+            public PerformanceTracker getPerformanceTracker() {
+                return performanceTracker;
+            }
+        };
+    }
+
+    private void addSampledCpuTime(final long processingNanoseconds, final 
long cpuNanoseconds) {
+        boolean updated = false;
+        while (!updated) {
+            final SampledCpuMetrics existingMetrics = sampledCpuMetrics.get();
+            final SampledCpuMetrics newMetrics = 
existingMetrics.add(processingNanoseconds, cpuNanoseconds);
+            updated = sampledCpuMetrics.compareAndSet(existingMetrics, 
newMetrics);
+        }
+    }
+
+    private long estimateCpuTime(final long processingNanoseconds) {
+        final SampledCpuMetrics metrics = sampledCpuMetrics.get();
+        if (metrics.processingNanos() == 0) {
+            return 0; // Avoid division by zero
+        }
+
+        // Estimate CPU time based on the ratio of sampled CPU time to 
processing time
+        return (processingNanoseconds * metrics.cpuNanos()) / 
metrics.processingNanos();
+    }
+
+    private record SampledCpuMetrics(long processingNanos, long cpuNanos) {
+
+        SampledCpuMetrics add(long processingNanos, long cpuNanos) {
+            return new SampledCpuMetrics(
+                this.processingNanos + processingNanos,
+                this.cpuNanos + cpuNanos
+            );
+        }
+    }
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StatsTracker.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StatsTracker.java
new file mode 100644
index 0000000000..6d64530503
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/StatsTracker.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics.tracking;
+
+public interface StatsTracker {
+
+    /**
+     * Starts tracking stats for a Processor, returning a TrackedStats object 
that can be used to capture
+     * stats when it is ended.
+     * @return a TrackedStats that can be used to capture stats when it is 
ended.
+     */
+    TrackedStats startTracking();
+
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/TrackedStats.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/TrackedStats.java
new file mode 100644
index 0000000000..e9a56085dd
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/tracking/TrackedStats.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics.tracking;
+
+import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface TrackedStats {
+
+    /**
+     * Ends the tracking of stats and returns a StandardFlowFileEvent that 
contains the stats.
+     * StandardFlowFileEvent is used here because it is often the case that we 
want to populate additional
+     * fields in the FlowFileEvent, and the StandardFlowFileEvent allows us to 
do this.
+     *
+     * @return a StandardFlowFileEvent that contains the stats collected 
during tracking.
+     */
+    StandardFlowFileEvent end();
+
+    /**
+     * Returns the PerformanceTracker associated with the TrackedStats so that 
it may be provided to
+     * {@link ProcessSession} etc. to gather performance metrics that are 
relevant.
+     *
+     * @return the PerformanceTracker associated with the TrackedStats
+     */
+    PerformanceTracker getPerformanceTracker();
+
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
index 3658b74763..41568ed9a2 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
@@ -28,6 +28,8 @@ import 
org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.NonPurgeableContentRepository;
 import org.apache.nifi.controller.repository.StatelessBridgeFlowFileRepository;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import 
org.apache.nifi.controller.repository.metrics.tracking.StandardStatsTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
 import org.apache.nifi.controller.scheduling.StatelessProcessScheduler;
 import 
org.apache.nifi.controller.scheduling.StatelessProcessSchedulerInitializationContext;
 import org.apache.nifi.engine.FlowEngine;
@@ -151,6 +153,9 @@ public class StandardStatelessGroupNodeFactory implements 
StatelessGroupNodeFact
             }
         };
 
+        final StatsTracker statsTracker = new 
StandardStatsTracker(flowController.getGarbageCollectionLog()::getTotalGarbageCollectionMillis,
+            flowController.getPerformanceTrackingPercentage());
+
         final LogRepository logRepository = 
LogRepositoryFactory.getRepository(group.getIdentifier());
         final StatelessGroupNode statelessGroupNode = new 
StandardStatelessGroupNode.Builder()
             .rootGroup(group)
@@ -165,6 +170,7 @@ public class StandardStatelessGroupNodeFactory implements 
StatelessGroupNodeFact
             .bulletinRepository(flowController.getBulletinRepository())
             .statelessGroupFactory(statelessGroupFactory)
             .lifecycleStateManager(flowController.getLifecycleStateManager())
+            .statsTracker(statsTracker)
             
.boredYieldDuration(flowController.getBoredYieldDuration(TimeUnit.MILLISECONDS),
 TimeUnit.MILLISECONDS)
             .build();
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 88841c3c73..2453415ec1 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -31,10 +31,10 @@ import 
org.apache.nifi.controller.repository.RepositoryContext;
 import org.apache.nifi.controller.repository.StandardProcessSession;
 import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
 import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory;
-import 
org.apache.nifi.controller.repository.metrics.NanoTimePerformanceTracker;
-import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
-import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
 import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
+import 
org.apache.nifi.controller.repository.metrics.tracking.StandardStatsTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.TrackedStats;
 import 
org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext;
 import org.apache.nifi.controller.scheduling.LifecycleState;
 import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
@@ -53,8 +53,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -73,10 +71,7 @@ public class ConnectableTask {
     private final ProcessContext processContext;
     private final FlowController flowController;
     private final int numRelationships;
-    private final ThreadMXBean threadMXBean;
-    private final AtomicLong invocations = new AtomicLong(0L);
-    private volatile SampledMetrics sampledMetrics = new SampledMetrics();
-    private final int perfTrackingNthIteration;
+    private final StatsTracker statsTracker;
 
     public ConnectableTask(final SchedulingAgent schedulingAgent, final 
Connectable connectable,
                            final FlowController flowController, final 
RepositoryContextFactory contextFactory, final LifecycleState lifecycleState) {
@@ -86,7 +81,6 @@ public class ConnectableTask {
         this.lifecycleState = lifecycleState;
         this.numRelationships = connectable.getRelationships().size();
         this.flowController = flowController;
-        this.threadMXBean = ManagementFactory.getThreadMXBean();
 
         final StateManager stateManager = new 
TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()),
 lifecycleState::isTerminated);
         if (connectable instanceof ProcessorNode) {
@@ -98,12 +92,8 @@ public class ConnectableTask {
 
         repositoryContext = contextFactory.newProcessContext(connectable, new 
AtomicLong(0L));
 
-        final int perfTrackingPercentage = 
flowController.getPerformanceTrackingPercentage();
-        if (perfTrackingPercentage == 0) {
-            perfTrackingNthIteration = 0;
-        } else {
-            perfTrackingNthIteration = 100 / perfTrackingPercentage;
-        }
+        statsTracker = new 
StandardStatsTracker(flowController.getGarbageCollectionLog()::getTotalGarbageCollectionMillis,
+            flowController.getPerformanceTrackingPercentage());
     }
 
     public Connectable getConnectable() {
@@ -197,33 +187,19 @@ public class ConnectableTask {
         }
 
         logger.debug("Triggering {}", connectable);
-        final long totalInvocationCount = invocations.getAndIncrement();
-
-        final boolean measureExpensiveMetrics = 
isMeasureExpensiveMetrics(totalInvocationCount);
-        final boolean measureCpuTime = measureExpensiveMetrics && 
threadMXBean.isCurrentThreadCpuTimeSupported();
-        final long startCpuTime;
-        final long startGcMillis;
-        if (measureCpuTime) {
-            startCpuTime = threadMXBean.getCurrentThreadCpuTime();
-            startGcMillis = 
flowController.getGarbageCollectionLog().getTotalGarbageCollectionMillis();
-        } else {
-            startCpuTime = 0L;
-            startGcMillis = 0L;
-        }
-
-        final PerformanceTracker performanceTracker = measureExpensiveMetrics 
? new NanoTimePerformanceTracker() : new NopPerformanceTracker();
+        final TrackedStats stats = statsTracker.startTracking();
 
         final long batchNanos = 
connectable.getRunDuration(TimeUnit.NANOSECONDS);
         final ProcessSessionFactory sessionFactory;
         final StandardProcessSession rawSession;
         final boolean batch;
         if (connectable.isSessionBatchingSupported() && batchNanos > 0L) {
-            rawSession = new StandardProcessSession(repositoryContext, 
lifecycleState::isTerminated, performanceTracker);
+            rawSession = new StandardProcessSession(repositoryContext, 
lifecycleState::isTerminated, stats.getPerformanceTracker());
             sessionFactory = new BatchingSessionFactory(rawSession);
             batch = true;
         } else {
             rawSession = null;
-            sessionFactory = new 
StandardProcessSessionFactory(repositoryContext, lifecycleState::isTerminated, 
performanceTracker);
+            sessionFactory = new 
StandardProcessSessionFactory(repositoryContext, lifecycleState::isTerminated, 
stats.getPerformanceTracker());
             batch = false;
         }
 
@@ -299,7 +275,7 @@ public class ConnectableTask {
                 }
 
                 try {
-                    updateEventRepo(startNanos, startCpuTime, startGcMillis, 
invocationCount, measureCpuTime, performanceTracker);
+                    updateEventRepo(stats, invocationCount);
                 } catch (final IOException e) {
                     logger.error("Unable to update FlowFileEvent Repository 
for {}; statistics may be inaccurate.", connectable.getRunnableComponent(), e);
                 }
@@ -312,128 +288,14 @@ public class ConnectableTask {
         return InvocationResult.DO_NOT_YIELD;
     }
 
-    private void updateEventRepo(final long startNanoTime, final long 
startCpuTime, final long startGcMillis, final int invocationCount, final 
boolean measureCpuTime,
-                                 final PerformanceTracker performanceTracker)
-                throws IOException {
-        final long processingNanos = System.nanoTime() - startNanoTime;
-        final StandardFlowFileEvent flowFileEvent = new 
StandardFlowFileEvent();
-        flowFileEvent.setProcessingNanos(processingNanos);
+    private void updateEventRepo(final TrackedStats stats, final int 
invocationCount) throws IOException {
+        final StandardFlowFileEvent flowFileEvent = stats.end();
         flowFileEvent.setInvocations(invocationCount);
-
-        // We won't always measure CPU time because it's expensive to 
calculate. So when we do measure it, we keep track of
-        // total CPU nanos measured as well as total processing time for those 
iterations. This gives us a ratio of CPU time vs. total time.
-        // We can then use that to extrapolate an approximate CPU Time.
-        if (measureCpuTime) {
-            updatePerformanceTrackingMetrics(flowFileEvent, 
performanceTracker, startCpuTime, startGcMillis, processingNanos);
-        } else {
-            estimatePerformanceTrackingMetrics(flowFileEvent, processingNanos);
-        }
-
         
repositoryContext.getFlowFileEventRepository().updateRepository(flowFileEvent, 
connectable.getIdentifier());
     }
 
-    private void estimatePerformanceTrackingMetrics(final 
StandardFlowFileEvent flowFileEvent, final long processingNanos) {
-        // Use ratio of measured CPU time vs. total time for that those 
iterations, to estimate the CPU time for this iteration.
-        final SampledMetrics currentMetrics = sampledMetrics;
-        final double processingRatio = (double) processingNanos / (double) 
Math.max(1, currentMetrics.getProcessingNanosSampled());
-        flowFileEvent.setCpuNanoseconds((long) (processingRatio * 
currentMetrics.getTotalCpuNanos()));
-        flowFileEvent.setContentReadNanoseconds((long) (processingRatio * 
currentMetrics.getReadNanos()));
-        flowFileEvent.setContentWriteNanoseconds((long) (processingRatio * 
currentMetrics.getWriteNanos()));
-        flowFileEvent.setSessionCommitNanos((long) (processingRatio * 
currentMetrics.getSessionCommitNanos()));
-        flowFileEvent.setGarbageCollectionMillis((long) (processingRatio * 
currentMetrics.getGcMillis()));
-    }
-
-    private void updatePerformanceTrackingMetrics(final StandardFlowFileEvent 
flowFileEvent, final PerformanceTracker performanceTracker, final long 
startCpuTime,
-                                                  final long startGcMillis, 
final long processingNanos) {
-        final long cpuTime = threadMXBean.getCurrentThreadCpuTime();
-        final long cpuNanos = cpuTime - startCpuTime;
-
-        final long endGcMillis = 
flowController.getGarbageCollectionLog().getTotalGarbageCollectionMillis();
-        final long gcMillis = endGcMillis - startGcMillis;
-
-        flowFileEvent.setCpuNanoseconds(cpuNanos);
-        
flowFileEvent.setContentWriteNanoseconds(performanceTracker.getContentWriteNanos());
-        
flowFileEvent.setContentReadNanoseconds(performanceTracker.getContentReadNanos());
-        
flowFileEvent.setSessionCommitNanos(performanceTracker.getSessionCommitNanos());
-        flowFileEvent.setGarbageCollectionMillis(gcMillis);
-
-        final SampledMetrics previousMetrics = sampledMetrics;
-        final SampledMetrics updatedMetrics = new SampledMetrics();
-        
updatedMetrics.setProcessingNanosSampled(previousMetrics.getProcessingNanosSampled()
 + processingNanos);
-        updatedMetrics.setTotalCpuNanos(previousMetrics.getTotalCpuNanos() + 
cpuNanos);
-        updatedMetrics.setReadNanos(previousMetrics.getReadNanos() + 
performanceTracker.getContentReadNanos());
-        updatedMetrics.setWriteNanos(previousMetrics.getWriteNanos() + 
performanceTracker.getContentWriteNanos());
-        
updatedMetrics.setSessionCommitNanos(previousMetrics.getSessionCommitNanos() + 
performanceTracker.getSessionCommitNanos());
-        updatedMetrics.setGcMillis(gcMillis);
-        this.sampledMetrics = updatedMetrics;
-    }
-
-    private boolean isMeasureExpensiveMetrics(final long invocationCount) {
-        if (perfTrackingNthIteration == 0) { // A value of 0 indicates we 
should never track performance metrics.
-            return false;
-        }
-
-        return invocationCount % perfTrackingNthIteration == 0;
-    }
-
     private ComponentLog getComponentLog() {
         return new SimpleProcessLogger(connectable.getIdentifier(), 
connectable.getRunnableComponent(), new StandardLoggingContext(connectable));
     }
 
-    private static class SampledMetrics {
-        private long processingNanosSampled = 0L;
-        private long totalCpuNanos = 0L;
-        private long readNanos = 0L;
-        private long writeNanos = 0L;
-        private long sessionCommitNanos = 0L;
-        private long gcMillis = 0L;
-
-        public long getProcessingNanosSampled() {
-            return processingNanosSampled;
-        }
-
-        public void setProcessingNanosSampled(final long 
processingNanosSampled) {
-            this.processingNanosSampled = processingNanosSampled;
-        }
-
-        public long getTotalCpuNanos() {
-            return totalCpuNanos;
-        }
-
-        public void setTotalCpuNanos(final long totalCpuNanos) {
-            this.totalCpuNanos = totalCpuNanos;
-        }
-
-        public long getReadNanos() {
-            return readNanos;
-        }
-
-        public void setReadNanos(final long readNanos) {
-            this.readNanos = readNanos;
-        }
-
-        public long getWriteNanos() {
-            return writeNanos;
-        }
-
-        public void setWriteNanos(final long writeNanos) {
-            this.writeNanos = writeNanos;
-        }
-
-        public long getSessionCommitNanos() {
-            return sessionCommitNanos;
-        }
-
-        public void setSessionCommitNanos(final long sessionCommitNanos) {
-            this.sessionCommitNanos = sessionCommitNanos;
-        }
-
-        public long getGcMillis() {
-            return gcMillis;
-        }
-
-        public void setGcMillis(final long gcMillis) {
-            this.gcMillis = gcMillis;
-        }
-    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
index 96d77aef75..5bdc42c52d 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
@@ -190,7 +190,7 @@ public class StatelessFlowTask {
 
         try {
             int invocationCount = 0;
-            while ((invocationCount == 0 || allowBatch) && 
System.currentTimeMillis() < endTime) {
+            while ((invocationCount == 0 || allowBatch) && 
statelessGroupNode.getDesiredState() == ScheduledState.RUNNING && 
System.currentTimeMillis() < endTime) {
                 invocationCount++;
 
                 final Invocation invocation = new Invocation();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
index 6f0f96723d..96ff6c9610 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
@@ -35,6 +35,7 @@ import org.apache.nifi.controller.SchedulingAgentCallback;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
 import org.apache.nifi.controller.scheduling.LifecycleState;
 import org.apache.nifi.controller.scheduling.LifecycleStateManager;
 import org.apache.nifi.controller.scheduling.SchedulingAgent;
@@ -105,6 +106,7 @@ public class StandardStatelessGroupNode implements 
StatelessGroupNode {
     private final StatelessGroupFactory statelessGroupFactory;
     private final LifecycleStateManager lifecycleStateManager;
     private final FlowFileEventRepository flowFileEventRepository;
+    private final StatsTracker statsTracker;
     private final long boredYieldMillis;
 
     private volatile List<Connection> incomingConnections;
@@ -132,6 +134,7 @@ public class StandardStatelessGroupNode implements 
StatelessGroupNode {
         this.statelessGroupFactory = builder.statelessGroupFactory;
         this.lifecycleStateManager = builder.lifecycleStateManager;
         this.flowFileEventRepository = builder.flowFileEventRepository;
+        this.statsTracker = builder.statsTracker;
         this.boredYieldMillis = builder.boredYieldMillis;
     }
 
@@ -354,7 +357,8 @@ public class StandardStatelessGroupNode implements 
StatelessGroupNode {
             processScheduler,
             bulletinRepository,
             lifecycleStateManager,
-            Duration.of(10, ChronoUnit.SECONDS));
+            Duration.of(10, ChronoUnit.SECONDS),
+            statsTracker);
 
         // We don't want to enable Controller Services because we want to use 
the actual Controller Services that exist within the
         // Standard NiFi instance, not the ephemeral ones that created during 
the initialization of the Stateless Group.
@@ -922,6 +926,7 @@ public class StandardStatelessGroupNode implements 
StatelessGroupNode {
         private StatelessGroupFactory statelessGroupFactory;
         private LifecycleStateManager lifecycleStateManager;
         private FlowFileEventRepository flowFileEventRepository;
+        private StatsTracker statsTracker;
         private long boredYieldMillis = 10L;
 
         public Builder rootGroup(final ProcessGroup rootGroup) {
@@ -984,6 +989,11 @@ public class StandardStatelessGroupNode implements 
StatelessGroupNode {
             return this;
         }
 
+        public Builder statsTracker(final StatsTracker statsTracker) {
+            this.statsTracker = statsTracker;
+            return this;
+        }
+
         public Builder boredYieldDuration(final long period, final TimeUnit 
timeUnit) {
             this.boredYieldMillis = TimeUnit.MILLISECONDS.convert(period, 
timeUnit);
             return this;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/CronSchedulingAgentTest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/CronSchedulingAgentTest.java
index 5f0fef1ace..3b46865ae5 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/CronSchedulingAgentTest.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/CronSchedulingAgentTest.java
@@ -20,6 +20,7 @@ import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.GarbageCollectionLog;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.reporting.ReportingTask;
@@ -33,6 +34,7 @@ import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -86,6 +88,7 @@ class CronSchedulingAgentTest {
         when(connectable.getIdentifier()).thenReturn(componentId);
         
when(flowController.getStateManagerProvider()).thenReturn(stateManagerProvider);
         
when(stateManagerProvider.getStateManager(eq(componentId))).thenReturn(stateManager);
+        
when(flowController.getGarbageCollectionLog()).thenReturn(mock(GarbageCollectionLog.class));
 
         schedulingAgent.doSchedule(connectable, lifecycleState);
     }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
index 0e931f7497..51b0b91e31 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
@@ -23,6 +23,7 @@ import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.GarbageCollectionLog;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
@@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestConnectableTask {
@@ -56,6 +58,8 @@ public class TestConnectableTask {
         final RepositoryContext repoContext = 
Mockito.mock(StandardRepositoryContext.class);
         
Mockito.when(repoContext.getFlowFileEventRepository()).thenReturn(Mockito.mock(FlowFileEventRepository.class));
 
+        
when(flowController.getGarbageCollectionLog()).thenReturn(mock(GarbageCollectionLog.class));
+
         final RepositoryContextFactory contextFactory = 
Mockito.mock(RepositoryContextFactory.class);
         
Mockito.when(contextFactory.newProcessContext(Mockito.any(Connectable.class), 
Mockito.any(AtomicLong.class))).thenReturn(repoContext);
 
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 63afae2adf..bfbb7e3467 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -40,6 +40,8 @@ import org.apache.nifi.controller.kerberos.KerberosConfig;
 import org.apache.nifi.controller.reporting.LogComponentStatuses;
 import org.apache.nifi.controller.repository.CounterRepository;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import 
org.apache.nifi.controller.repository.metrics.tracking.StandardStatsTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
 import org.apache.nifi.controller.scheduling.LifecycleStateManager;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.PropertyEncryptor;
@@ -198,8 +200,10 @@ public class StandardStatelessEngine implements 
StatelessEngine {
         overrideParameters(parameterContextMap, parameterValueProvider);
 
         final List<ReportingTaskNode> reportingTaskNodes = 
createReportingTasks(dataflowDefinition);
+        final StatsTracker statsTracker = new StandardStatsTracker(() -> 0, 0);
         final StandardStatelessFlow dataflow = new 
StandardStatelessFlow(childGroup, reportingTaskNodes, 
controllerServiceProvider, processContextFactory,
-            repositoryContextFactory, dataflowDefinition, 
stateManagerProvider, processScheduler, bulletinRepository, 
lifecycleStateManager, componentEnableTimeout);
+            repositoryContextFactory, dataflowDefinition, 
stateManagerProvider, processScheduler, bulletinRepository, 
lifecycleStateManager, componentEnableTimeout,
+            statsTracker);
 
         if (statusTaskInterval != null) {
             final LogComponentStatuses logComponentStatuses = new 
LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index d94a7c25da..215a717d41 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -45,6 +45,7 @@ import org.apache.nifi.controller.repository.RepositoryRecord;
 import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
 import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
 import org.apache.nifi.controller.scheduling.LifecycleStateManager;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -129,6 +130,7 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
     private final LifecycleStateManager lifecycleStateManager;
     private final long componentEnableTimeoutMillis;
     private final List<Port> inputPorts;
+    private final StatsTracker statsTracker;
 
     private volatile ExecutorService runDataflowExecutor;
     private volatile ScheduledExecutorService backgroundTaskExecutor;
@@ -139,7 +141,7 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
     public StandardStatelessFlow(final ProcessGroup rootGroup, final 
List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider 
controllerServiceProvider,
                                  final ProcessContextFactory 
processContextFactory, final RepositoryContextFactory repositoryContextFactory, 
final DataflowDefinition dataflowDefinition,
                                  final StatelessStateManagerProvider 
stateManagerProvider, final ProcessScheduler processScheduler, final 
BulletinRepository bulletinRepository,
-                                 final LifecycleStateManager 
lifecycleStateManager, final Duration componentEnableTimeout) {
+                                 final LifecycleStateManager 
lifecycleStateManager, final Duration componentEnableTimeout, final 
StatsTracker statsTracker) {
         this.rootGroup = rootGroup;
         this.allConnections = rootGroup.findAllConnections();
         this.reportingTasks = reportingTasks;
@@ -155,6 +157,7 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         this.tracker = new AsynchronousCommitTracker(rootGroup);
         this.lifecycleStateManager = lifecycleStateManager;
         this.inputPorts = new ArrayList<>(rootGroup.getInputPorts());
+        this.statsTracker = statsTracker;
 
         rootConnectables = new HashSet<>();
         inputPortsByName = mapInputPortsToName(rootGroup);
@@ -370,9 +373,6 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         shutdown = true;
         logger.info("Shutting down dataflow {}", rootGroup.getName());
 
-        if (runDataflowExecutor != null) {
-            runDataflowExecutor.shutdown();
-        }
         if (backgroundTaskExecutor != null) {
             backgroundTaskExecutor.shutdown();
         }
@@ -384,11 +384,12 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
 
         // Wait for the graceful shutdown period for all processors to stop. 
If the processors do not stop within this time,
         // then interrupt them.
-        if (runDataflowExecutor != null && interruptProcessors) {
+        boolean interrupt = false;
+        if (interruptProcessors) {
             if (gracefulShutdownDuration.isZero()) {
                 logger.info("Shutting down all components immediately without 
waiting for graceful shutdown period");
                 tracker.triggerFailureCallbacks(new TerminatedTaskException());
-                runDataflowExecutor.shutdownNow();
+                interrupt = true;
             } else {
                 final boolean gracefullyStopped = 
waitForProcessorThreadsToComplete(runningProcessors, gracefulShutdownDuration);
                 if (gracefullyStopped) {
@@ -398,14 +399,22 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
                     logger.warn("{} Processors did not finish running within 
the graceful shutdown period of {} millis. Interrupting all running components. 
Processors still running: {}",
                         runningProcessors.size(), 
gracefulShutdownDuration.toMillis(), runningProcessors);
                     tracker.triggerFailureCallbacks(new 
TerminatedTaskException());
-                    runDataflowExecutor.shutdownNow();
+                    interrupt = true;
                 }
             }
-        } else if (runDataflowExecutor != null) {
+        } else {
             waitForProcessorThreadsToComplete(runningProcessors, 
gracefulShutdownDuration);
             tracker.triggerCallbacks();
         }
 
+        if (runDataflowExecutor != null) {
+            if (interrupt) {
+                runDataflowExecutor.shutdownNow();
+            } else {
+                runDataflowExecutor.shutdown();
+            }
+        }
+
         // Stop components but do not trigger @OnUnscheduled because those 
were already triggered.
         final CompletableFuture<Void> stopFuture = 
rootGroup.stopComponents(ProcessorStopLifecycleMethods.TRIGGER_ONSTOPPED);
         try {
@@ -662,6 +671,7 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
 
         final StatelessFlowCurrent current = new 
StandardStatelessFlowCurrent.Builder()
             .commitTracker(tracker)
+            .statsTracker(statsTracker)
             .executionProgress(executionProgress)
             .processContextFactory(processContextFactory)
             .repositoryContextFactory(repositoryContextFactory)
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
index 3b0d998f19..a6912bff15 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
@@ -25,6 +25,8 @@ import 
org.apache.nifi.connectable.ConnectionUtils.FlowFileCloneResult;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
+import org.apache.nifi.controller.repository.metrics.tracking.TrackedStats;
 import org.apache.nifi.controller.scheduling.LifecycleState;
 import org.apache.nifi.controller.scheduling.LifecycleStateManager;
 import org.apache.nifi.flowfile.FlowFile;
@@ -50,7 +52,7 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
     private static final Logger logger = 
LoggerFactory.getLogger(StandardStatelessFlowCurrent.class);
 
     private final TransactionThresholdMeter transactionThresholdMeter;
-    private final AsynchronousCommitTracker tracker;
+    private final AsynchronousCommitTracker commitTracker;
     private final ExecutionProgress executionProgress;
     private final Set<Connectable> rootConnectables;
     private final FlowFileSupplier flowFileSupplier;
@@ -59,11 +61,12 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
     private final RepositoryContextFactory repositoryContextFactory;
     private final ProcessContextFactory processContextFactory;
     private final LifecycleStateManager lifecycleStateManager;
+    private final StatsTracker statsTracker;
 
 
     private StandardStatelessFlowCurrent(final Builder builder) {
         this.transactionThresholdMeter = builder.transactionThresholdMeter;
-        this.tracker = builder.tracker;
+        this.commitTracker = builder.commitTracker;
         this.executionProgress = builder.executionProgress;
         this.rootConnectables = builder.rootConnectables;
         this.flowFileSupplier = builder.flowFileSupplier;
@@ -72,6 +75,7 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
         this.repositoryContextFactory = builder.repositoryContextFactory;
         this.processContextFactory = builder.processContextFactory;
         this.lifecycleStateManager = builder.lifecycleStateManager;
+        this.statsTracker = builder.statsTracker;
     }
 
     @Override
@@ -81,8 +85,8 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
             while (!completionReached) {
                 triggerRootConnectables();
 
-                while (tracker.isAnyReady()) {
-                    final Connectable connectable = tracker.getNextReady();
+                while (commitTracker.isAnyReady()) {
+                    final Connectable connectable = 
commitTracker.getNextReady();
                     logger.debug("The next ready component to be triggered: 
{}", connectable);
 
                     // Continually trigger the given component as long as it 
is ready to be triggered
@@ -105,11 +109,11 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
 
                 // We have reached completion if the tracker does not know of 
any components ready to be triggered AND
                 // we have no data queued in the flow (with the exception of 
Output Ports).
-                completionReached = !tracker.isAnyReady() && 
isFlowQueueEmpty();
+                completionReached = !commitTracker.isAnyReady() && 
isFlowQueueEmpty();
             }
         } catch (final Throwable t) {
             executionProgress.notifyExecutionFailed(t);
-            tracker.triggerFailureCallbacks(t);
+            commitTracker.triggerFailureCallbacks(t);
             throw t;
         }
     }
@@ -196,7 +200,7 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
             }
 
             final Optional<FlowFile> flowFileOptional = 
flowFileSupplier.getFlowFile(inputPort.getName());
-            if (!flowFileOptional.isPresent()) {
+            if (flowFileOptional.isEmpty()) {
                 continue;
             }
 
@@ -209,7 +213,7 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
             cloneResult.distributeFlowFiles();
 
             for (final Connection connection : outputConnections) {
-                tracker.addConnectable(connection.getDestination());
+                commitTracker.addConnectable(connection.getDestination());
             }
         }
 
@@ -221,32 +225,35 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
 
         // Reset progress and trigger the component. This allows us to track 
whether or not any progress was made by the given connectable
         // during this invocation of its onTrigger method.
-        tracker.resetProgress();
+        commitTracker.resetProgress();
 
+        final TrackedStats trackedStats = statsTracker.startTracking();
         final StatelessProcessSessionFactory statelessSessionFactory = new 
StatelessProcessSessionFactory(connectable, repositoryContextFactory, 
provenanceEventRepository, processContextFactory,
-            executionProgress, false, tracker);
+            executionProgress, false, commitTracker, 
trackedStats.getPerformanceTracker());
 
         lifecycleState.incrementActiveThreadCount(null);
         try {
-            trigger(connectable, statelessSessionFactory);
+            trigger(connectable, statelessSessionFactory, trackedStats);
         } finally {
             lifecycleState.decrementActiveThreadCount();
+            registerProcessEvent(connectable, 1, trackedStats);
         }
 
         // Keep track of the output of the source component so that we can 
determine whether or not we've reached our transaction threshold.
-        
transactionThresholdMeter.incrementFlowFiles(tracker.getFlowFilesProduced());
-        transactionThresholdMeter.incrementBytes(tracker.getBytesProduced());
+        
transactionThresholdMeter.incrementFlowFiles(commitTracker.getFlowFilesProduced());
+        
transactionThresholdMeter.incrementBytes(commitTracker.getBytesProduced());
     }
 
     private NextConnectable triggerWhileReady(final Connectable connectable) {
         final LifecycleState lifecycleState = 
lifecycleStateManager.getOrRegisterLifecycleState(connectable.getIdentifier(), 
true, false);
 
+        final TrackedStats trackedStats = statsTracker.startTracking();
         final StatelessProcessSessionFactory statelessSessionFactory = new 
StatelessProcessSessionFactory(connectable, repositoryContextFactory, 
provenanceEventRepository, processContextFactory,
-            executionProgress, false, tracker);
+            executionProgress, false, commitTracker, 
trackedStats.getPerformanceTracker());
 
         lifecycleState.incrementActiveThreadCount(null);
         try {
-            while (tracker.isReady(connectable)) {
+            while (commitTracker.isReady(connectable)) {
                 if (executionProgress.isCanceled()) {
                     logger.info("Dataflow was canceled so will not trigger any 
more components");
                     return NextConnectable.NONE;
@@ -254,12 +261,12 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
 
                 // Reset progress and trigger the component. This allows us to 
track whether or not any progress was made by the given connectable
                 // during this invocation of its onTrigger method.
-                tracker.resetProgress();
-                trigger(connectable, statelessSessionFactory);
+                commitTracker.resetProgress();
+                trigger(connectable, statelessSessionFactory, trackedStats);
 
                 // Check if the component made any progress or not. If so, 
continue on. If not, we need to check if providing the component with
                 // additional input would help the component to progress or 
not.
-                final boolean progressed = tracker.isProgress();
+                final boolean progressed = commitTracker.isProgress();
                 if (progressed) {
                     logger.debug("{} was triggered and made progress", 
connectable);
                     continue;
@@ -292,27 +299,22 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
             return NextConnectable.NEXT_READY;
         } finally {
             lifecycleState.decrementActiveThreadCount();
+            registerProcessEvent(connectable, 1, trackedStats);
         }
     }
 
 
-    private void trigger(final Connectable connectable, final 
ProcessSessionFactory sessionFactory) {
+    private void trigger(final Connectable connectable, final 
ProcessSessionFactory sessionFactory, final TrackedStats trackedStats) {
         final ProcessContext processContext = 
processContextFactory.createProcessContext(connectable);
 
-        final long start = System.nanoTime();
-
         // Trigger component
         logger.debug("Triggering {}", connectable);
         connectable.onTrigger(processContext, sessionFactory);
-
-        final long processingNanos = System.nanoTime() - start;
-        registerProcessEvent(connectable, 1, processingNanos);
     }
 
-    private void registerProcessEvent(final Connectable connectable, final int 
invocations, final long processingNanos) {
+    private void registerProcessEvent(final Connectable connectable, final int 
invocations, final TrackedStats trackedStats) {
         try {
-            final StandardFlowFileEvent procEvent = new 
StandardFlowFileEvent();
-            procEvent.setProcessingNanos(processingNanos);
+            final StandardFlowFileEvent procEvent = trackedStats.end();
             procEvent.setInvocations(invocations);
             
repositoryContextFactory.getFlowFileEventRepository().updateRepository(procEvent,
 connectable.getIdentifier());
         } catch (final IOException e) {
@@ -330,7 +332,7 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
 
     public static class Builder {
         private TransactionThresholdMeter transactionThresholdMeter;
-        private AsynchronousCommitTracker tracker;
+        private AsynchronousCommitTracker commitTracker;
         private ExecutionProgress executionProgress;
         private Set<Connectable> rootConnectables;
         private Collection<Port> inputPorts;
@@ -339,15 +341,17 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
         private RepositoryContextFactory repositoryContextFactory;
         private ProcessContextFactory processContextFactory;
         private LifecycleStateManager lifecycleStateManager;
+        private StatsTracker statsTracker;
 
         public StandardStatelessFlowCurrent build() {
             Objects.requireNonNull(transactionThresholdMeter, "Transaction 
Threshold Meter must be set");
-            Objects.requireNonNull(tracker, "Commit Tracker must be set");
+            Objects.requireNonNull(commitTracker, "Commit Tracker must be 
set");
             Objects.requireNonNull(executionProgress, "Execution Progress must 
be set");
             Objects.requireNonNull(rootConnectables, "Root Conectables must be 
set");
             Objects.requireNonNull(repositoryContextFactory, "Repository 
Context Factory must be set");
             Objects.requireNonNull(provenanceEventRepository, "Provenance 
Event Repository must be set");
             Objects.requireNonNull(processContextFactory, "Process Context 
Factory must be set");
+            Objects.requireNonNull(statsTracker, "Stats Tracker must be set");
 
             return new StandardStatelessFlowCurrent(this);
         }
@@ -363,7 +367,12 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
         }
 
         public Builder commitTracker(final AsynchronousCommitTracker 
commitTracker) {
-            this.tracker = commitTracker;
+            this.commitTracker = commitTracker;
+            return this;
+        }
+
+        public Builder statsTracker(final StatsTracker statsTracker) {
+            this.statsTracker = statsTracker;
             return this;
         }
 
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessContentClaimWriteCache.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessContentClaimWriteCache.java
index 68184e9e57..cd42e160d2 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessContentClaimWriteCache.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessContentClaimWriteCache.java
@@ -21,6 +21,8 @@ import 
org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
+import 
org.apache.nifi.controller.repository.metrics.PerformanceTrackingOutputStream;
 import org.apache.nifi.processor.exception.ProcessException;
 
 import java.io.IOException;
@@ -30,10 +32,12 @@ import java.util.List;
 
 public class StatelessContentClaimWriteCache implements ContentClaimWriteCache 
{
     private final ContentRepository contentRepository;
+    private final PerformanceTracker performanceTracker;
     private final List<OutputStream> writtenTo = new ArrayList<>();
 
-    public StatelessContentClaimWriteCache(final ContentRepository 
contentRepository) {
+    public StatelessContentClaimWriteCache(final ContentRepository 
contentRepository, final PerformanceTracker performanceTracker) {
         this.contentRepository = contentRepository;
+        this.performanceTracker = performanceTracker;
     }
 
     @Override
@@ -56,7 +60,8 @@ public class StatelessContentClaimWriteCache implements 
ContentClaimWriteCache {
 
     @Override
     public OutputStream write(final ContentClaim claim) throws IOException {
-        final OutputStream out = contentRepository.write(claim);
+        final OutputStream rawOut = contentRepository.write(claim);
+        final OutputStream out = new PerformanceTrackingOutputStream(rawOut, 
performanceTracker);
         writtenTo.add(out);
         return out;
     }
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
index 841c029ddf..a354ddec95 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
@@ -32,17 +32,17 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class StatelessRepositoryContext extends AbstractRepositoryContext 
implements RepositoryContext {
-    private final ContentClaimWriteCache writeCache;
+    private final ContentRepository contentRepository;
 
     public StatelessRepositoryContext(final Connectable connectable, final 
AtomicLong connectionIndex, final ContentRepository contentRepository, final 
FlowFileRepository flowFileRepository,
                                       final FlowFileEventRepository 
flowFileEventRepository, final CounterRepository counterRepository, final 
ProvenanceEventRepository provenanceRepository,
                                       final StateManager stateManager) {
         super(connectable, connectionIndex, contentRepository, 
flowFileRepository, flowFileEventRepository, counterRepository, 
provenanceRepository, stateManager);
-        writeCache = new StatelessContentClaimWriteCache(contentRepository);
+        this.contentRepository = contentRepository;
     }
 
     @Override
     public ContentClaimWriteCache createContentClaimWriteCache(final 
PerformanceTracker performanceTracker) {
-        return writeCache;
+        return new StatelessContentClaimWriteCache(contentRepository, 
performanceTracker);
     }
 }
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
index 7695043f36..566831b4ba 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java
@@ -24,7 +24,7 @@ import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.StandardProcessSession;
-import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
+import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
 import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
@@ -53,22 +53,25 @@ public class StatelessProcessSession extends 
StandardProcessSession {
     private final ProcessContextFactory processContextFactory;
     private final ProvenanceEventRepository provenanceEventRepository;
     private final ExecutionProgress executionProgress;
-    private final AsynchronousCommitTracker tracker;
+    private final AsynchronousCommitTracker commitTracker;
+    private final PerformanceTracker performanceTracker;
 
     private boolean requireSynchronousCommits;
 
     public StatelessProcessSession(final Connectable connectable, final 
RepositoryContextFactory repositoryContextFactory,
                                    final ProvenanceEventRepository 
provenanceEventRepository, final ProcessContextFactory processContextFactory,
-                                   final ExecutionProgress progress, final 
boolean requireSynchronousCommits, final AsynchronousCommitTracker tracker) {
+                                   final ExecutionProgress progress, final 
boolean requireSynchronousCommits, final AsynchronousCommitTracker 
commitTracker,
+                                   final PerformanceTracker 
performanceTracker) {
 
-        super(repositoryContextFactory.createRepositoryContext(connectable, 
provenanceEventRepository), progress::isCanceled, new NopPerformanceTracker());
+        super(repositoryContextFactory.createRepositoryContext(connectable, 
provenanceEventRepository), progress::isCanceled, performanceTracker);
         this.connectable = connectable;
         this.repositoryContextFactory = repositoryContextFactory;
         this.provenanceEventRepository = provenanceEventRepository;
         this.processContextFactory = processContextFactory;
         this.executionProgress = progress;
         this.requireSynchronousCommits = requireSynchronousCommits;
-        this.tracker = tracker;
+        this.commitTracker = commitTracker;
+        this.performanceTracker = performanceTracker;
     }
 
     @Override
@@ -94,7 +97,7 @@ public class StatelessProcessSession extends 
StandardProcessSession {
         // If we don't require synchronous commits, we can trigger the async 
commit, but we can't call the callback yet, because we only can call the 
success callback when we've completed the
         // dataflow in order to ensure that we don't destroy data in a way 
that it can't be replayed if the downstream processors fail.
         if (!requireSynchronousCommits) {
-            tracker.addCallback(connectable, onSuccess, onFailure, this);
+            commitTracker.addCallback(connectable, onSuccess, onFailure, this);
             super.commitAsync();
             return;
         }
@@ -132,7 +135,7 @@ public class StatelessProcessSession extends 
StandardProcessSession {
         // Check if the Processor made any progress or not. If so, record this 
fact so that the framework knows that this was the case.
         final int flowFileCounts = checkpoint.getFlowFilesIn() + 
checkpoint.getFlowFilesOut() + checkpoint.getFlowFilesRemoved();
         if (flowFileCounts > 0) {
-            tracker.recordProgress(checkpoint.getFlowFilesOut() + 
checkpoint.getFlowFilesRemoved(), checkpoint.getBytesOut() + 
checkpoint.getBytesRemoved());
+            commitTracker.recordProgress(checkpoint.getFlowFilesOut() + 
checkpoint.getFlowFilesRemoved(), checkpoint.getBytesOut() + 
checkpoint.getBytesRemoved());
         }
 
         // Commit the session
@@ -216,7 +219,7 @@ public class StatelessProcessSession extends 
StandardProcessSession {
                 continue;
             }
 
-            tracker.addConnectable(connectable);
+            commitTracker.addConnectable(connectable);
         }
     }
 
@@ -252,7 +255,8 @@ public class StatelessProcessSession extends 
StandardProcessSession {
 
         final ProcessContext connectableContext = 
processContextFactory.createProcessContext(connectable);
         final ProcessSessionFactory connectableSessionFactory = new 
StatelessProcessSessionFactory(connectable, repositoryContextFactory, 
provenanceEventRepository,
-            processContextFactory, executionProgress, 
requireSynchronousCommits, new 
AsynchronousCommitTracker(tracker.getRootGroup()));
+            processContextFactory, executionProgress, 
requireSynchronousCommits,
+            new AsynchronousCommitTracker(commitTracker.getRootGroup()), 
performanceTracker);
 
         logger.debug("Triggering {}", connectable);
         final long start = System.nanoTime();
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSessionFactory.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSessionFactory.java
index 179cd35c28..056c699e94 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSessionFactory.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSessionFactory.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.stateless.session;
 
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
@@ -32,24 +33,27 @@ public class StatelessProcessSessionFactory implements 
ProcessSessionFactory {
     private final ProcessContextFactory processContextFactory;
     private final ExecutionProgress executionProgress;
     private final boolean requireSynchronousCommits;
-    private final AsynchronousCommitTracker tracker;
+    private final AsynchronousCommitTracker commitTracker;
+    private final PerformanceTracker performanceTracker;
 
     public StatelessProcessSessionFactory(final Connectable connectable, final 
RepositoryContextFactory contextFactory,
                                           final ProvenanceEventRepository 
provenanceEventRepository, final ProcessContextFactory processContextFactory,
-                                          final ExecutionProgress 
executionProgress, final boolean requireSynchronousCommits, final 
AsynchronousCommitTracker tracker) {
+                                          final ExecutionProgress 
executionProgress, final boolean requireSynchronousCommits, final 
AsynchronousCommitTracker commitTracker,
+                                          final PerformanceTracker 
performanceTracker) {
         this.connectable = connectable;
         this.contextFactory = contextFactory;
         this.provenanceEventRepository = provenanceEventRepository;
         this.processContextFactory = processContextFactory;
         this.executionProgress = executionProgress;
         this.requireSynchronousCommits = requireSynchronousCommits;
-        this.tracker = tracker;
+        this.commitTracker = commitTracker;
+        this.performanceTracker = performanceTracker;
     }
 
     @Override
     public ProcessSession createSession() {
         final StatelessProcessSession session = new 
StatelessProcessSession(connectable, contextFactory, provenanceEventRepository, 
processContextFactory, executionProgress,
-            requireSynchronousCommits, tracker);
+            requireSynchronousCommits, commitTracker, performanceTracker);
         executionProgress.registerCreatedSession(session);
         return session;
     }

Reply via email to