This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/NIFI-15258 by this push:
     new 5e0255d256 NIFI-15618: Bug fixes around handling of FlowFileActivity 
with child groups and stateless groups (#10912)
5e0255d256 is described below

commit 5e0255d2567764bf33cf15b3bc421d12eb5f46ea
Author: Mark Payne <[email protected]>
AuthorDate: Wed Feb 18 10:23:01 2026 -0500

    NIFI-15618: Bug fixes around handling of FlowFileActivity with child groups 
and stateless groups (#10912)
---
 .../mock/connector/server/ConnectorTestRunner.java |   3 +
 .../server/StandardConnectorMockServer.java        |   6 +
 .../connector/StandardConnectorTestRunner.java     |   5 +
 .../connectable/ProcessGroupFlowFileActivity.java  |  33 ++++-
 .../ProcessGroupFlowFileActivityTest.java          | 162 +++++++++++++++++++++
 .../nifi/controller/tasks/StatelessFlowTask.java   |  68 +++++++++
 .../controller/tasks/TestStatelessFlowTask.java    | 139 +++++++++++++++++-
 .../nifi/stateless/flow/StatelessDataflow.java     |   9 ++
 .../nifi/stateless/flow/StandardStatelessFlow.java |   5 +
 9 files changed, 423 insertions(+), 7 deletions(-)

diff --git 
a/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java
 
b/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java
index 22b122402d..cbe24db2aa 100644
--- 
a/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java
+++ 
b/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.mock.connector.server;
 
+import org.apache.nifi.components.DescribedValue;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.connector.AssetReference;
 import org.apache.nifi.components.connector.ConnectorValueReference;
@@ -77,4 +78,6 @@ public interface ConnectorTestRunner extends Closeable {
     default int getHttpPort() {
         return -1;
     }
+
+    List<DescribedValue> fetchAllowableValues(String stepName, String 
propertyName);
 }
diff --git 
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
 
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
index f71ab45d51..1049a6a511 100644
--- 
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
+++ 
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
@@ -27,6 +27,7 @@ import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.bundle.BundleDetails;
 import org.apache.nifi.cluster.ClusterDetailsFactory;
 import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.DescribedValue;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.connector.AssetReference;
 import org.apache.nifi.components.connector.Connector;
@@ -410,6 +411,11 @@ public class StandardConnectorMockServer implements 
ConnectorMockServer {
         }
     }
 
+    @Override
+    public List<DescribedValue> fetchAllowableValues(final String stepName, 
final String propertyName) {
+        return connectorNode.fetchAllowableValues(stepName, propertyName);
+    }
+
     @Override
     public List<ValidationResult> validate() {
         final ValidationState validationState = 
connectorNode.performValidation();
diff --git 
a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java
 
b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java
index 294fadec94..7fbfffb49e 100644
--- 
a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java
+++ 
b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java
@@ -19,6 +19,7 @@ package org.apache.nifi.mock.connector;
 
 import org.apache.nifi.NiFiServer;
 import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.components.DescribedValue;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.connector.AssetReference;
 import org.apache.nifi.components.connector.ConnectorValueReference;
@@ -221,6 +222,10 @@ public class StandardConnectorTestRunner implements 
ConnectorTestRunner, Closeab
         return mockServer.getHttpPort();
     }
 
+    public List<DescribedValue> fetchAllowableValues(final String stepName, 
final String propertyName) {
+        return mockServer.fetchAllowableValues(stepName, propertyName);
+    }
+
 
     public static class Builder {
         private String connectorClassName;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivity.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivity.java
index d8cdf5ba51..1d8449ef47 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivity.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivity.java
@@ -18,7 +18,9 @@
 package org.apache.nifi.connectable;
 
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.StatelessGroupNode;
 
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -88,14 +90,24 @@ public class ProcessGroupFlowFileActivity implements 
FlowFileActivity {
         }
 
         // Check stateless group node if present
-        group.getStatelessGroupNode().ifPresent(statelessGroupNode -> {
-            final OptionalLong activityTime = 
statelessGroupNode.getFlowFileActivity().getLatestActivityTime();
+        final Optional<StatelessGroupNode> statelessGroupNode = 
group.getStatelessGroupNode();
+        if (statelessGroupNode.isPresent()) {
+            final OptionalLong activityTime = 
statelessGroupNode.get().getFlowFileActivity().getLatestActivityTime();
             activityTime.ifPresent(time -> {
                 if (time > latestActivityTime.get()) {
                     latestActivityTime.set(time);
                 }
             });
-        });
+        }
+
+        for (final ProcessGroup childGroup : group.getProcessGroups()) {
+            final OptionalLong activityTime = 
childGroup.getFlowFileActivity().getLatestActivityTime();
+            activityTime.ifPresent(time -> {
+                if (time > latestActivityTime.get()) {
+                    latestActivityTime.set(time);
+                }
+            });
+        }
 
         final long result = latestActivityTime.get();
         return result == -1L ? OptionalLong.empty() : OptionalLong.of(result);
@@ -135,9 +147,18 @@ public class ProcessGroupFlowFileActivity implements 
FlowFileActivity {
             totalSentBytes += counts.getSentBytes();
         }
 
-        // Aggregate transfer counts from all funnels
-        for (final Connectable connectable : group.getFunnels()) {
-            final FlowFileTransferCounts counts = 
connectable.getFlowFileActivity().getTransferCounts();
+        for (final ProcessGroup childGroup : group.getProcessGroups()) {
+            final FlowFileTransferCounts counts = 
childGroup.getFlowFileActivity().getTransferCounts();
+            totalReceivedCount += counts.getReceivedCount();
+            totalReceivedBytes += counts.getReceivedBytes();
+            totalSentCount += counts.getSentCount();
+            totalSentBytes += counts.getSentBytes();
+        }
+
+        // Aggregate transfer counts from stateless group node if present
+        final Optional<StatelessGroupNode> statelessGroupNode = 
group.getStatelessGroupNode();
+        if (statelessGroupNode.isPresent()) {
+            final FlowFileTransferCounts counts = 
statelessGroupNode.get().getFlowFileActivity().getTransferCounts();
             totalReceivedCount += counts.getReceivedCount();
             totalReceivedBytes += counts.getReceivedBytes();
             totalSentCount += counts.getSentCount();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivityTest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivityTest.java
new file mode 100644
index 0000000000..754b955dad
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivityTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.connectable;
+
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.StatelessGroupNode;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class ProcessGroupFlowFileActivityTest {
+
+    @Mock
+    private ProcessGroup processGroup;
+
+    private ProcessGroupFlowFileActivity activity;
+
+    @BeforeEach
+    void setUp() {
+        activity = new ProcessGroupFlowFileActivity(processGroup);
+
+        
lenient().when(processGroup.getProcessors()).thenReturn(Collections.emptyList());
+        
lenient().when(processGroup.getInputPorts()).thenReturn(Collections.emptySet());
+        
lenient().when(processGroup.getOutputPorts()).thenReturn(Collections.emptySet());
+        
lenient().when(processGroup.getFunnels()).thenReturn(Collections.emptySet());
+        
lenient().when(processGroup.getProcessGroups()).thenReturn(Collections.emptySet());
+        
lenient().when(processGroup.getStatelessGroupNode()).thenReturn(Optional.empty());
+    }
+
+    @Test
+    void testGetTransferCountsWithNoComponents() {
+        final FlowFileTransferCounts counts = activity.getTransferCounts();
+
+        assertEquals(0L, counts.getReceivedCount());
+        assertEquals(0L, counts.getReceivedBytes());
+        assertEquals(0L, counts.getSentCount());
+        assertEquals(0L, counts.getSentBytes());
+    }
+
+    @Test
+    void testGetTransferCountsAggregatesProcessors() {
+        final ProcessorNode processor = 
createMockConnectable(ProcessorNode.class, 10, 100L, 5, 50L);
+        when(processGroup.getProcessors()).thenReturn(List.of(processor));
+
+        final FlowFileTransferCounts counts = activity.getTransferCounts();
+
+        assertEquals(10L, counts.getReceivedCount());
+        assertEquals(100L, counts.getReceivedBytes());
+        assertEquals(5L, counts.getSentCount());
+        assertEquals(50L, counts.getSentBytes());
+    }
+
+    @Test
+    void testGetTransferCountsIncludesStatelessGroupNode() {
+        final StatelessGroupNode statelessGroupNode = 
mock(StatelessGroupNode.class);
+        final FlowFileActivity statelessActivity = new 
ConnectableFlowFileActivity();
+        statelessActivity.updateTransferCounts(20, 2000L, 15, 1500L);
+
+        
when(statelessGroupNode.getFlowFileActivity()).thenReturn(statelessActivity);
+        
when(processGroup.getStatelessGroupNode()).thenReturn(Optional.of(statelessGroupNode));
+
+        final FlowFileTransferCounts counts = activity.getTransferCounts();
+
+        assertEquals(20L, counts.getReceivedCount());
+        assertEquals(2000L, counts.getReceivedBytes());
+        assertEquals(15L, counts.getSentCount());
+        assertEquals(1500L, counts.getSentBytes());
+    }
+
+    @Test
+    void testGetTransferCountsAggregatesProcessorsAndStatelessGroupNode() {
+        final ProcessorNode processor = 
createMockConnectable(ProcessorNode.class, 10, 100L, 5, 50L);
+        when(processGroup.getProcessors()).thenReturn(List.of(processor));
+
+        final StatelessGroupNode statelessGroupNode = 
mock(StatelessGroupNode.class);
+        final FlowFileActivity statelessActivity = new 
ConnectableFlowFileActivity();
+        statelessActivity.updateTransferCounts(20, 2000L, 15, 1500L);
+        
when(statelessGroupNode.getFlowFileActivity()).thenReturn(statelessActivity);
+        
when(processGroup.getStatelessGroupNode()).thenReturn(Optional.of(statelessGroupNode));
+
+        final FlowFileTransferCounts counts = activity.getTransferCounts();
+
+        assertEquals(30L, counts.getReceivedCount());
+        assertEquals(2100L, counts.getReceivedBytes());
+        assertEquals(20L, counts.getSentCount());
+        assertEquals(1550L, counts.getSentBytes());
+    }
+
+    @Test
+    void testGetTransferCountsAggregatesChildGroups() {
+        final ProcessGroup childGroup = mock(ProcessGroup.class);
+        final FlowFileActivity childActivity = mock(FlowFileActivity.class);
+        when(childActivity.getTransferCounts()).thenReturn(new 
FlowFileTransferCounts(8, 800L, 3, 300L));
+        when(childGroup.getFlowFileActivity()).thenReturn(childActivity);
+        when(processGroup.getProcessGroups()).thenReturn(Set.of(childGroup));
+
+        final FlowFileTransferCounts counts = activity.getTransferCounts();
+
+        assertEquals(8L, counts.getReceivedCount());
+        assertEquals(800L, counts.getReceivedBytes());
+        assertEquals(3L, counts.getSentCount());
+        assertEquals(300L, counts.getSentBytes());
+    }
+
+    @Test
+    void testGetLatestActivityTimeEmpty() {
+        final OptionalLong result = activity.getLatestActivityTime();
+        assertTrue(result.isEmpty());
+    }
+
+    @Test
+    void testGetLatestActivityTimeIncludesStatelessGroupNode() {
+        final StatelessGroupNode statelessGroupNode = 
mock(StatelessGroupNode.class);
+        final ConnectableFlowFileActivity statelessActivity = new 
ConnectableFlowFileActivity();
+        statelessActivity.updateLatestActivityTime();
+        
when(statelessGroupNode.getFlowFileActivity()).thenReturn(statelessActivity);
+        
when(processGroup.getStatelessGroupNode()).thenReturn(Optional.of(statelessGroupNode));
+
+        final OptionalLong result = activity.getLatestActivityTime();
+        assertTrue(result.isPresent());
+    }
+
+    private <T extends Connectable> T createMockConnectable(final Class<T> 
type,
+            final int receivedCount, final long receivedBytes, final int 
sentCount, final long sentBytes) {
+        final T connectable = mock(type);
+        final FlowFileActivity connectableActivity = new 
ConnectableFlowFileActivity();
+        connectableActivity.updateTransferCounts(receivedCount, receivedBytes, 
sentCount, sentBytes);
+        
when(connectable.getFlowFileActivity()).thenReturn(connectableActivity);
+        return connectable;
+    }
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
index 0b251ccde4..d115b4c652 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
@@ -66,6 +66,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
@@ -229,6 +230,8 @@ public class StatelessFlowTask {
                 fail(successfulInvocations, statelessProvRepo, e);
             }
 
+            updateFlowFileActivity(statelessProvRepo);
+
             logger.debug("Acknowledging FlowFiles from {} invocations", 
allInvocations.size());
             for (final Invocation invocation : allInvocations) {
                 for (final PolledFlowFile polledFlowFile : 
invocation.getPolledFlowFiles()) {
@@ -624,6 +627,71 @@ public class StatelessFlowTask {
     }
 
 
+    /**
+     * Updates the Stateless Group Node's FlowFileActivity. The latest 
activity time is obtained from the stateless flow's root group,
+     * which is updated by processors within the flow when their sessions 
commit. Transfer counts are computed from provenance events
+     * generated during the stateless flow execution, mirroring the behavior 
of StandardProcessSession.updateTransferCounts.
+     *
+     * @param statelessProvRepo the provenance event repository used during 
stateless flow execution
+     */
+    void updateFlowFileActivity(final ProvenanceEventRepository 
statelessProvRepo) {
+        final OptionalLong latestActivityTime = flow.getLatestActivityTime();
+        if (latestActivityTime.isPresent()) {
+            
statelessGroupNode.getFlowFileActivity().updateLatestActivityTime();
+        }
+
+        updateTransferCounts(statelessProvRepo);
+    }
+
+    private void updateTransferCounts(final ProvenanceEventRepository 
statelessProvRepo) {
+        int receivedCount = 0;
+        long receivedBytes = 0L;
+        int sentCount = 0;
+        long sentBytes = 0L;
+
+        long firstProvEventId = 0;
+        while (true) {
+            try {
+                final List<ProvenanceEventRecord> events = 
statelessProvRepo.getEvents(firstProvEventId, 1000);
+                if (events.isEmpty()) {
+                    break;
+                }
+
+                for (final ProvenanceEventRecord event : events) {
+                    final ProvenanceEventType eventType = event.getEventType();
+                    switch (eventType) {
+                        case RECEIVE:
+                        case CREATE:
+                            receivedCount++;
+                            receivedBytes += event.getFileSize();
+                            break;
+                        case FETCH:
+                            receivedBytes += event.getFileSize();
+                            break;
+                        case SEND:
+                            sentCount++;
+                            sentBytes += event.getFileSize();
+                            break;
+                        default:
+                            break;
+                    }
+                }
+
+                if (events.size() < 1000) {
+                    break;
+                }
+                firstProvEventId += 1000;
+            } catch (final IOException e) {
+                logger.warn("Failed to obtain Provenance Events for FlowFile 
Activity tracking", e);
+                break;
+            }
+        }
+
+        if (receivedCount > 0 || receivedBytes > 0L || sentCount > 0 || 
sentBytes > 0L) {
+            
statelessGroupNode.getFlowFileActivity().updateTransferCounts(receivedCount, 
receivedBytes, sentCount, sentBytes);
+        }
+    }
+
     private void expireRecords(final FlowFileQueue sourceQueue, final 
Set<FlowFileRecord> expiredRecords) throws IOException {
         if (expiredRecords.isEmpty()) {
             return;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestStatelessFlowTask.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestStatelessFlowTask.java
index cb2c29c711..5b6335082a 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestStatelessFlowTask.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestStatelessFlowTask.java
@@ -17,8 +17,11 @@
 
 package org.apache.nifi.controller.tasks;
 
+import org.apache.nifi.connectable.ConnectableFlowFileActivity;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.ConnectionUtils.FlowFileCloneResult;
+import org.apache.nifi.connectable.FlowFileActivity;
+import org.apache.nifi.connectable.FlowFileTransferCounts;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.MockFlowFileRecord;
 import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -63,6 +66,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -90,6 +94,8 @@ public class TestStatelessFlowTask {
     private List<ProvenanceEventRecord> statelessProvenanceEvents;
     private Map<String, StandardFlowFileEvent> flowFileEventsByComponentId;
     private ProvenanceEventRepository statelessProvRepo;
+    private FlowFileActivity groupNodeFlowFileActivity;
+    private StatelessDataflow statelessFlow;
 
     @BeforeEach
     public void setup() throws IOException {
@@ -123,10 +129,13 @@ public class TestStatelessFlowTask {
             return statelessProvenanceEvents.subList((int) startEventId, (int) 
lastEvent);
         }).when(statelessProvRepo).getEvents(anyLong(), anyInt());
 
-        final StatelessDataflow statelessFlow = mock(StatelessDataflow.class);
+        statelessFlow = mock(StatelessDataflow.class);
+        
when(statelessFlow.getLatestActivityTime()).thenReturn(OptionalLong.empty());
 
         final StatelessGroupNode statelessGroupNode = 
mock(StatelessGroupNode.class);
         when(statelessGroupNode.getProcessGroup()).thenReturn(rootGroup);
+        groupNodeFlowFileActivity = new ConnectableFlowFileActivity();
+        
when(statelessGroupNode.getFlowFileActivity()).thenReturn(groupNodeFlowFileActivity);
 
         final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
 
@@ -531,6 +540,134 @@ public class TestStatelessFlowTask {
     }
 
 
+    @Test
+    public void testUpdateFlowFileActivityWithNoEvents() {
+        task.updateFlowFileActivity(statelessProvRepo);
+
+        final FlowFileTransferCounts counts = 
groupNodeFlowFileActivity.getTransferCounts();
+        assertEquals(0L, counts.getReceivedCount());
+        assertEquals(0L, counts.getReceivedBytes());
+        assertEquals(0L, counts.getSentCount());
+        assertEquals(0L, counts.getSentBytes());
+        
assertTrue(groupNodeFlowFileActivity.getLatestActivityTime().isEmpty());
+    }
+
+    @Test
+    public void testUpdateFlowFileActivityCountsReceiveAndSendEvents() {
+        
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.RECEIVE,
 100L));
+        
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.RECEIVE,
 200L));
+        
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.SEND, 
50L));
+
+        task.updateFlowFileActivity(statelessProvRepo);
+
+        final FlowFileTransferCounts counts = 
groupNodeFlowFileActivity.getTransferCounts();
+        assertEquals(2L, counts.getReceivedCount());
+        assertEquals(300L, counts.getReceivedBytes());
+        assertEquals(1L, counts.getSentCount());
+        assertEquals(50L, counts.getSentBytes());
+    }
+
+    @Test
+    public void testUpdateFlowFileActivityCountsCreateEvents() {
+        
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.CREATE, 
75L));
+
+        task.updateFlowFileActivity(statelessProvRepo);
+
+        final FlowFileTransferCounts counts = 
groupNodeFlowFileActivity.getTransferCounts();
+        assertEquals(1L, counts.getReceivedCount());
+        assertEquals(75L, counts.getReceivedBytes());
+        assertEquals(0L, counts.getSentCount());
+        assertEquals(0L, counts.getSentBytes());
+    }
+
+    @Test
+    public void testUpdateFlowFileActivityCountsFetchBytesButNotCount() {
+        
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.FETCH, 
500L));
+
+        task.updateFlowFileActivity(statelessProvRepo);
+
+        final FlowFileTransferCounts counts = 
groupNodeFlowFileActivity.getTransferCounts();
+        assertEquals(0L, counts.getReceivedCount());
+        assertEquals(500L, counts.getReceivedBytes());
+        assertEquals(0L, counts.getSentCount());
+        assertEquals(0L, counts.getSentBytes());
+    }
+
+    @Test
+    public void testUpdateFlowFileActivityIgnoresOtherEventTypes() {
+        
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.ATTRIBUTES_MODIFIED,
 100L));
+        
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.CONTENT_MODIFIED,
 200L));
+        
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.ROUTE, 
300L));
+
+        task.updateFlowFileActivity(statelessProvRepo);
+
+        final FlowFileTransferCounts counts = 
groupNodeFlowFileActivity.getTransferCounts();
+        assertEquals(0L, counts.getReceivedCount());
+        assertEquals(0L, counts.getReceivedBytes());
+        assertEquals(0L, counts.getSentCount());
+        assertEquals(0L, counts.getSentBytes());
+    }
+
+    @Test
+    public void testUpdateFlowFileActivitySetsActivityTimeFromFlow() {
+        
when(statelessFlow.getLatestActivityTime()).thenReturn(OptionalLong.of(System.currentTimeMillis()));
+
+        task.updateFlowFileActivity(statelessProvRepo);
+
+        
assertTrue(groupNodeFlowFileActivity.getLatestActivityTime().isPresent());
+    }
+
+    @Test
+    public void testUpdateFlowFileActivityNoActivityTimeWhenFlowReportsNone() {
+        
when(statelessFlow.getLatestActivityTime()).thenReturn(OptionalLong.empty());
+        
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.RECEIVE,
 100L));
+
+        task.updateFlowFileActivity(statelessProvRepo);
+
+        
assertTrue(groupNodeFlowFileActivity.getLatestActivityTime().isEmpty());
+    }
+
+    @Test
+    public void testUpdateFlowFileActivityAccumulatesAcrossMultipleCalls() {
+        
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.RECEIVE,
 100L));
+        task.updateFlowFileActivity(statelessProvRepo);
+
+        statelessProvenanceEvents.clear();
+        
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.SEND, 
50L));
+        task.updateFlowFileActivity(statelessProvRepo);
+
+        final FlowFileTransferCounts counts = 
groupNodeFlowFileActivity.getTransferCounts();
+        assertEquals(1L, counts.getReceivedCount());
+        assertEquals(100L, counts.getReceivedBytes());
+        assertEquals(1L, counts.getSentCount());
+        assertEquals(50L, counts.getSentBytes());
+    }
+
+    private ProvenanceEventRecord createProvenanceEvent(final 
ProvenanceEventType eventType, final long fileSize) {
+        final StandardProvenanceEventRecord.Builder builder = new 
StandardProvenanceEventRecord.Builder()
+            .setEventType(eventType)
+            .setComponentId("component-1")
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileUUID("uuid-1")
+            .setComponentType("Unit Test")
+            .setCurrentContentClaim(null, null, null, null, fileSize);
+
+        switch (eventType) {
+            case RECEIVE:
+            case SEND:
+            case FETCH:
+                builder.setTransitUri("http://localhost/test";);
+                break;
+            case ROUTE:
+                builder.setRelationship("success");
+                break;
+            default:
+                break;
+        }
+
+        return builder.build();
+    }
+
     private FlowFileRecord createFlowFile() {
         final ResourceClaim resourceClaim = new 
StandardResourceClaim(resourceClaimManager, "container", "section", "1", false);
         final ContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, 0L);
diff --git 
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
 
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
index 758b153371..66ccfb0c91 100644
--- 
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
+++ 
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
@@ -111,4 +111,13 @@ public interface StatelessDataflow {
     OptionalLong getCounter(String componentId, String counterName);
 
     Map<String, Long> getCounters(Pattern counterNamePattern);
+
+    /**
+     * Returns the latest time at which any component in the dataflow had 
activity, or an empty OptionalLong if there has been no activity.
+     *
+     * @return the latest activity time in milliseconds since epoch, or empty 
if no activity has occurred
+     */
+    default OptionalLong getLatestActivityTime() {
+        return OptionalLong.empty();
+    }
 }
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 248683bdad..b55bf4f06e 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -965,6 +965,11 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
             .collect(Collectors.toMap(Counter::getName, Counter::getValue));
     }
 
+    @Override
+    public OptionalLong getLatestActivityTime() {
+        return rootGroup.getFlowFileActivity().getLatestActivityTime();
+    }
+
     private String findInstanceId(final String componentId) {
         return rootGroup.findAllProcessors().stream()
             .filter(processor -> Objects.equals(processor.getIdentifier(), 
componentId) || Objects.equals(processor.getVersionedComponentId().orElse(""), 
componentId))

Reply via email to