This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 5e0255d256 NIFI-15618: Bug fixes around handling of FlowFileActivity
with child groups and stateless groups (#10912)
5e0255d256 is described below
commit 5e0255d2567764bf33cf15b3bc421d12eb5f46ea
Author: Mark Payne <[email protected]>
AuthorDate: Wed Feb 18 10:23:01 2026 -0500
NIFI-15618: Bug fixes around handling of FlowFileActivity with child groups
and stateless groups (#10912)
---
.../mock/connector/server/ConnectorTestRunner.java | 3 +
.../server/StandardConnectorMockServer.java | 6 +
.../connector/StandardConnectorTestRunner.java | 5 +
.../connectable/ProcessGroupFlowFileActivity.java | 33 ++++-
.../ProcessGroupFlowFileActivityTest.java | 162 +++++++++++++++++++++
.../nifi/controller/tasks/StatelessFlowTask.java | 68 +++++++++
.../controller/tasks/TestStatelessFlowTask.java | 139 +++++++++++++++++-
.../nifi/stateless/flow/StatelessDataflow.java | 9 ++
.../nifi/stateless/flow/StandardStatelessFlow.java | 5 +
9 files changed, 423 insertions(+), 7 deletions(-)
diff --git
a/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java
b/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java
index 22b122402d..cbe24db2aa 100644
---
a/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java
+++
b/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java
@@ -17,6 +17,7 @@
package org.apache.nifi.mock.connector.server;
+import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.connector.AssetReference;
import org.apache.nifi.components.connector.ConnectorValueReference;
@@ -77,4 +78,6 @@ public interface ConnectorTestRunner extends Closeable {
default int getHttpPort() {
return -1;
}
+
+ List<DescribedValue> fetchAllowableValues(String stepName, String
propertyName);
}
diff --git
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
index f71ab45d51..1049a6a511 100644
---
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
+++
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java
@@ -27,6 +27,7 @@ import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.bundle.BundleDetails;
import org.apache.nifi.cluster.ClusterDetailsFactory;
import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.connector.AssetReference;
import org.apache.nifi.components.connector.Connector;
@@ -410,6 +411,11 @@ public class StandardConnectorMockServer implements
ConnectorMockServer {
}
}
+ @Override
+ public List<DescribedValue> fetchAllowableValues(final String stepName,
final String propertyName) {
+ return connectorNode.fetchAllowableValues(stepName, propertyName);
+ }
+
@Override
public List<ValidationResult> validate() {
final ValidationState validationState =
connectorNode.performValidation();
diff --git
a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java
b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java
index 294fadec94..7fbfffb49e 100644
---
a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java
+++
b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java
@@ -19,6 +19,7 @@ package org.apache.nifi.mock.connector;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.connector.AssetReference;
import org.apache.nifi.components.connector.ConnectorValueReference;
@@ -221,6 +222,10 @@ public class StandardConnectorTestRunner implements
ConnectorTestRunner, Closeab
return mockServer.getHttpPort();
}
+ public List<DescribedValue> fetchAllowableValues(final String stepName,
final String propertyName) {
+ return mockServer.fetchAllowableValues(stepName, propertyName);
+ }
+
public static class Builder {
private String connectorClassName;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivity.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivity.java
index d8cdf5ba51..1d8449ef47 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivity.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivity.java
@@ -18,7 +18,9 @@
package org.apache.nifi.connectable;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.StatelessGroupNode;
+import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
@@ -88,14 +90,24 @@ public class ProcessGroupFlowFileActivity implements
FlowFileActivity {
}
// Check stateless group node if present
- group.getStatelessGroupNode().ifPresent(statelessGroupNode -> {
- final OptionalLong activityTime =
statelessGroupNode.getFlowFileActivity().getLatestActivityTime();
+ final Optional<StatelessGroupNode> statelessGroupNode =
group.getStatelessGroupNode();
+ if (statelessGroupNode.isPresent()) {
+ final OptionalLong activityTime =
statelessGroupNode.get().getFlowFileActivity().getLatestActivityTime();
activityTime.ifPresent(time -> {
if (time > latestActivityTime.get()) {
latestActivityTime.set(time);
}
});
- });
+ }
+
+ for (final ProcessGroup childGroup : group.getProcessGroups()) {
+ final OptionalLong activityTime =
childGroup.getFlowFileActivity().getLatestActivityTime();
+ activityTime.ifPresent(time -> {
+ if (time > latestActivityTime.get()) {
+ latestActivityTime.set(time);
+ }
+ });
+ }
final long result = latestActivityTime.get();
return result == -1L ? OptionalLong.empty() : OptionalLong.of(result);
@@ -135,9 +147,18 @@ public class ProcessGroupFlowFileActivity implements
FlowFileActivity {
totalSentBytes += counts.getSentBytes();
}
- // Aggregate transfer counts from all funnels
- for (final Connectable connectable : group.getFunnels()) {
- final FlowFileTransferCounts counts =
connectable.getFlowFileActivity().getTransferCounts();
+ for (final ProcessGroup childGroup : group.getProcessGroups()) {
+ final FlowFileTransferCounts counts =
childGroup.getFlowFileActivity().getTransferCounts();
+ totalReceivedCount += counts.getReceivedCount();
+ totalReceivedBytes += counts.getReceivedBytes();
+ totalSentCount += counts.getSentCount();
+ totalSentBytes += counts.getSentBytes();
+ }
+
+ // Aggregate transfer counts from stateless group node if present
+ final Optional<StatelessGroupNode> statelessGroupNode =
group.getStatelessGroupNode();
+ if (statelessGroupNode.isPresent()) {
+ final FlowFileTransferCounts counts =
statelessGroupNode.get().getFlowFileActivity().getTransferCounts();
totalReceivedCount += counts.getReceivedCount();
totalReceivedBytes += counts.getReceivedBytes();
totalSentCount += counts.getSentCount();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivityTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivityTest.java
new file mode 100644
index 0000000000..754b955dad
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/connectable/ProcessGroupFlowFileActivityTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.connectable;
+
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.StatelessGroupNode;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class ProcessGroupFlowFileActivityTest {
+
+ @Mock
+ private ProcessGroup processGroup;
+
+ private ProcessGroupFlowFileActivity activity;
+
+ @BeforeEach
+ void setUp() {
+ activity = new ProcessGroupFlowFileActivity(processGroup);
+
+
lenient().when(processGroup.getProcessors()).thenReturn(Collections.emptyList());
+
lenient().when(processGroup.getInputPorts()).thenReturn(Collections.emptySet());
+
lenient().when(processGroup.getOutputPorts()).thenReturn(Collections.emptySet());
+
lenient().when(processGroup.getFunnels()).thenReturn(Collections.emptySet());
+
lenient().when(processGroup.getProcessGroups()).thenReturn(Collections.emptySet());
+
lenient().when(processGroup.getStatelessGroupNode()).thenReturn(Optional.empty());
+ }
+
+ @Test
+ void testGetTransferCountsWithNoComponents() {
+ final FlowFileTransferCounts counts = activity.getTransferCounts();
+
+ assertEquals(0L, counts.getReceivedCount());
+ assertEquals(0L, counts.getReceivedBytes());
+ assertEquals(0L, counts.getSentCount());
+ assertEquals(0L, counts.getSentBytes());
+ }
+
+ @Test
+ void testGetTransferCountsAggregatesProcessors() {
+ final ProcessorNode processor =
createMockConnectable(ProcessorNode.class, 10, 100L, 5, 50L);
+ when(processGroup.getProcessors()).thenReturn(List.of(processor));
+
+ final FlowFileTransferCounts counts = activity.getTransferCounts();
+
+ assertEquals(10L, counts.getReceivedCount());
+ assertEquals(100L, counts.getReceivedBytes());
+ assertEquals(5L, counts.getSentCount());
+ assertEquals(50L, counts.getSentBytes());
+ }
+
+ @Test
+ void testGetTransferCountsIncludesStatelessGroupNode() {
+ final StatelessGroupNode statelessGroupNode =
mock(StatelessGroupNode.class);
+ final FlowFileActivity statelessActivity = new
ConnectableFlowFileActivity();
+ statelessActivity.updateTransferCounts(20, 2000L, 15, 1500L);
+
+
when(statelessGroupNode.getFlowFileActivity()).thenReturn(statelessActivity);
+
when(processGroup.getStatelessGroupNode()).thenReturn(Optional.of(statelessGroupNode));
+
+ final FlowFileTransferCounts counts = activity.getTransferCounts();
+
+ assertEquals(20L, counts.getReceivedCount());
+ assertEquals(2000L, counts.getReceivedBytes());
+ assertEquals(15L, counts.getSentCount());
+ assertEquals(1500L, counts.getSentBytes());
+ }
+
+ @Test
+ void testGetTransferCountsAggregatesProcessorsAndStatelessGroupNode() {
+ final ProcessorNode processor =
createMockConnectable(ProcessorNode.class, 10, 100L, 5, 50L);
+ when(processGroup.getProcessors()).thenReturn(List.of(processor));
+
+ final StatelessGroupNode statelessGroupNode =
mock(StatelessGroupNode.class);
+ final FlowFileActivity statelessActivity = new
ConnectableFlowFileActivity();
+ statelessActivity.updateTransferCounts(20, 2000L, 15, 1500L);
+
when(statelessGroupNode.getFlowFileActivity()).thenReturn(statelessActivity);
+
when(processGroup.getStatelessGroupNode()).thenReturn(Optional.of(statelessGroupNode));
+
+ final FlowFileTransferCounts counts = activity.getTransferCounts();
+
+ assertEquals(30L, counts.getReceivedCount());
+ assertEquals(2100L, counts.getReceivedBytes());
+ assertEquals(20L, counts.getSentCount());
+ assertEquals(1550L, counts.getSentBytes());
+ }
+
+ @Test
+ void testGetTransferCountsAggregatesChildGroups() {
+ final ProcessGroup childGroup = mock(ProcessGroup.class);
+ final FlowFileActivity childActivity = mock(FlowFileActivity.class);
+ when(childActivity.getTransferCounts()).thenReturn(new
FlowFileTransferCounts(8, 800L, 3, 300L));
+ when(childGroup.getFlowFileActivity()).thenReturn(childActivity);
+ when(processGroup.getProcessGroups()).thenReturn(Set.of(childGroup));
+
+ final FlowFileTransferCounts counts = activity.getTransferCounts();
+
+ assertEquals(8L, counts.getReceivedCount());
+ assertEquals(800L, counts.getReceivedBytes());
+ assertEquals(3L, counts.getSentCount());
+ assertEquals(300L, counts.getSentBytes());
+ }
+
+ @Test
+ void testGetLatestActivityTimeEmpty() {
+ final OptionalLong result = activity.getLatestActivityTime();
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void testGetLatestActivityTimeIncludesStatelessGroupNode() {
+ final StatelessGroupNode statelessGroupNode =
mock(StatelessGroupNode.class);
+ final ConnectableFlowFileActivity statelessActivity = new
ConnectableFlowFileActivity();
+ statelessActivity.updateLatestActivityTime();
+
when(statelessGroupNode.getFlowFileActivity()).thenReturn(statelessActivity);
+
when(processGroup.getStatelessGroupNode()).thenReturn(Optional.of(statelessGroupNode));
+
+ final OptionalLong result = activity.getLatestActivityTime();
+ assertTrue(result.isPresent());
+ }
+
+ private <T extends Connectable> T createMockConnectable(final Class<T>
type,
+ final int receivedCount, final long receivedBytes, final int
sentCount, final long sentBytes) {
+ final T connectable = mock(type);
+ final FlowFileActivity connectableActivity = new
ConnectableFlowFileActivity();
+ connectableActivity.updateTransferCounts(receivedCount, receivedBytes,
sentCount, sentBytes);
+
when(connectable.getFlowFileActivity()).thenReturn(connectableActivity);
+ return connectable;
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
index 0b251ccde4..d115b4c652 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
@@ -66,6 +66,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@@ -229,6 +230,8 @@ public class StatelessFlowTask {
fail(successfulInvocations, statelessProvRepo, e);
}
+ updateFlowFileActivity(statelessProvRepo);
+
logger.debug("Acknowledging FlowFiles from {} invocations",
allInvocations.size());
for (final Invocation invocation : allInvocations) {
for (final PolledFlowFile polledFlowFile :
invocation.getPolledFlowFiles()) {
@@ -624,6 +627,71 @@ public class StatelessFlowTask {
}
+ /**
+ * Updates the Stateless Group Node's FlowFileActivity. The latest
activity time is obtained from the stateless flow's root group,
+ * which is updated by processors within the flow when their sessions
commit. Transfer counts are computed from provenance events
+ * generated during the stateless flow execution, mirroring the behavior
of StandardProcessSession.updateTransferCounts.
+ *
+ * @param statelessProvRepo the provenance event repository used during
stateless flow execution
+ */
+ void updateFlowFileActivity(final ProvenanceEventRepository
statelessProvRepo) {
+ final OptionalLong latestActivityTime = flow.getLatestActivityTime();
+ if (latestActivityTime.isPresent()) {
+
statelessGroupNode.getFlowFileActivity().updateLatestActivityTime();
+ }
+
+ updateTransferCounts(statelessProvRepo);
+ }
+
+ private void updateTransferCounts(final ProvenanceEventRepository
statelessProvRepo) {
+ int receivedCount = 0;
+ long receivedBytes = 0L;
+ int sentCount = 0;
+ long sentBytes = 0L;
+
+ long firstProvEventId = 0;
+ while (true) {
+ try {
+ final List<ProvenanceEventRecord> events =
statelessProvRepo.getEvents(firstProvEventId, 1000);
+ if (events.isEmpty()) {
+ break;
+ }
+
+ for (final ProvenanceEventRecord event : events) {
+ final ProvenanceEventType eventType = event.getEventType();
+ switch (eventType) {
+ case RECEIVE:
+ case CREATE:
+ receivedCount++;
+ receivedBytes += event.getFileSize();
+ break;
+ case FETCH:
+ receivedBytes += event.getFileSize();
+ break;
+ case SEND:
+ sentCount++;
+ sentBytes += event.getFileSize();
+ break;
+ default:
+ break;
+ }
+ }
+
+ if (events.size() < 1000) {
+ break;
+ }
+ firstProvEventId += 1000;
+ } catch (final IOException e) {
+ logger.warn("Failed to obtain Provenance Events for FlowFile
Activity tracking", e);
+ break;
+ }
+ }
+
+ if (receivedCount > 0 || receivedBytes > 0L || sentCount > 0 ||
sentBytes > 0L) {
+
statelessGroupNode.getFlowFileActivity().updateTransferCounts(receivedCount,
receivedBytes, sentCount, sentBytes);
+ }
+ }
+
private void expireRecords(final FlowFileQueue sourceQueue, final
Set<FlowFileRecord> expiredRecords) throws IOException {
if (expiredRecords.isEmpty()) {
return;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestStatelessFlowTask.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestStatelessFlowTask.java
index cb2c29c711..5b6335082a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestStatelessFlowTask.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestStatelessFlowTask.java
@@ -17,8 +17,11 @@
package org.apache.nifi.controller.tasks;
+import org.apache.nifi.connectable.ConnectableFlowFileActivity;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.ConnectionUtils.FlowFileCloneResult;
+import org.apache.nifi.connectable.FlowFileActivity;
+import org.apache.nifi.connectable.FlowFileTransferCounts;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.MockFlowFileRecord;
import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -63,6 +66,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -90,6 +94,8 @@ public class TestStatelessFlowTask {
private List<ProvenanceEventRecord> statelessProvenanceEvents;
private Map<String, StandardFlowFileEvent> flowFileEventsByComponentId;
private ProvenanceEventRepository statelessProvRepo;
+ private FlowFileActivity groupNodeFlowFileActivity;
+ private StatelessDataflow statelessFlow;
@BeforeEach
public void setup() throws IOException {
@@ -123,10 +129,13 @@ public class TestStatelessFlowTask {
return statelessProvenanceEvents.subList((int) startEventId, (int)
lastEvent);
}).when(statelessProvRepo).getEvents(anyLong(), anyInt());
- final StatelessDataflow statelessFlow = mock(StatelessDataflow.class);
+ statelessFlow = mock(StatelessDataflow.class);
+
when(statelessFlow.getLatestActivityTime()).thenReturn(OptionalLong.empty());
final StatelessGroupNode statelessGroupNode =
mock(StatelessGroupNode.class);
when(statelessGroupNode.getProcessGroup()).thenReturn(rootGroup);
+ groupNodeFlowFileActivity = new ConnectableFlowFileActivity();
+
when(statelessGroupNode.getFlowFileActivity()).thenReturn(groupNodeFlowFileActivity);
final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
@@ -531,6 +540,134 @@ public class TestStatelessFlowTask {
}
+ @Test
+ public void testUpdateFlowFileActivityWithNoEvents() {
+ task.updateFlowFileActivity(statelessProvRepo);
+
+ final FlowFileTransferCounts counts =
groupNodeFlowFileActivity.getTransferCounts();
+ assertEquals(0L, counts.getReceivedCount());
+ assertEquals(0L, counts.getReceivedBytes());
+ assertEquals(0L, counts.getSentCount());
+ assertEquals(0L, counts.getSentBytes());
+
assertTrue(groupNodeFlowFileActivity.getLatestActivityTime().isEmpty());
+ }
+
+ @Test
+ public void testUpdateFlowFileActivityCountsReceiveAndSendEvents() {
+
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.RECEIVE,
100L));
+
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.RECEIVE,
200L));
+
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.SEND,
50L));
+
+ task.updateFlowFileActivity(statelessProvRepo);
+
+ final FlowFileTransferCounts counts =
groupNodeFlowFileActivity.getTransferCounts();
+ assertEquals(2L, counts.getReceivedCount());
+ assertEquals(300L, counts.getReceivedBytes());
+ assertEquals(1L, counts.getSentCount());
+ assertEquals(50L, counts.getSentBytes());
+ }
+
+ @Test
+ public void testUpdateFlowFileActivityCountsCreateEvents() {
+
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.CREATE,
75L));
+
+ task.updateFlowFileActivity(statelessProvRepo);
+
+ final FlowFileTransferCounts counts =
groupNodeFlowFileActivity.getTransferCounts();
+ assertEquals(1L, counts.getReceivedCount());
+ assertEquals(75L, counts.getReceivedBytes());
+ assertEquals(0L, counts.getSentCount());
+ assertEquals(0L, counts.getSentBytes());
+ }
+
+ @Test
+ public void testUpdateFlowFileActivityCountsFetchBytesButNotCount() {
+
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.FETCH,
500L));
+
+ task.updateFlowFileActivity(statelessProvRepo);
+
+ final FlowFileTransferCounts counts =
groupNodeFlowFileActivity.getTransferCounts();
+ assertEquals(0L, counts.getReceivedCount());
+ assertEquals(500L, counts.getReceivedBytes());
+ assertEquals(0L, counts.getSentCount());
+ assertEquals(0L, counts.getSentBytes());
+ }
+
+ @Test
+ public void testUpdateFlowFileActivityIgnoresOtherEventTypes() {
+
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.ATTRIBUTES_MODIFIED,
100L));
+
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.CONTENT_MODIFIED,
200L));
+
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.ROUTE,
300L));
+
+ task.updateFlowFileActivity(statelessProvRepo);
+
+ final FlowFileTransferCounts counts =
groupNodeFlowFileActivity.getTransferCounts();
+ assertEquals(0L, counts.getReceivedCount());
+ assertEquals(0L, counts.getReceivedBytes());
+ assertEquals(0L, counts.getSentCount());
+ assertEquals(0L, counts.getSentBytes());
+ }
+
+ @Test
+ public void testUpdateFlowFileActivitySetsActivityTimeFromFlow() {
+
when(statelessFlow.getLatestActivityTime()).thenReturn(OptionalLong.of(System.currentTimeMillis()));
+
+ task.updateFlowFileActivity(statelessProvRepo);
+
+
assertTrue(groupNodeFlowFileActivity.getLatestActivityTime().isPresent());
+ }
+
+ @Test
+ public void testUpdateFlowFileActivityNoActivityTimeWhenFlowReportsNone() {
+
when(statelessFlow.getLatestActivityTime()).thenReturn(OptionalLong.empty());
+
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.RECEIVE,
100L));
+
+ task.updateFlowFileActivity(statelessProvRepo);
+
+
assertTrue(groupNodeFlowFileActivity.getLatestActivityTime().isEmpty());
+ }
+
+ @Test
+ public void testUpdateFlowFileActivityAccumulatesAcrossMultipleCalls() {
+
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.RECEIVE,
100L));
+ task.updateFlowFileActivity(statelessProvRepo);
+
+ statelessProvenanceEvents.clear();
+
statelessProvenanceEvents.add(createProvenanceEvent(ProvenanceEventType.SEND,
50L));
+ task.updateFlowFileActivity(statelessProvRepo);
+
+ final FlowFileTransferCounts counts =
groupNodeFlowFileActivity.getTransferCounts();
+ assertEquals(1L, counts.getReceivedCount());
+ assertEquals(100L, counts.getReceivedBytes());
+ assertEquals(1L, counts.getSentCount());
+ assertEquals(50L, counts.getSentBytes());
+ }
+
+ private ProvenanceEventRecord createProvenanceEvent(final
ProvenanceEventType eventType, final long fileSize) {
+ final StandardProvenanceEventRecord.Builder builder = new
StandardProvenanceEventRecord.Builder()
+ .setEventType(eventType)
+ .setComponentId("component-1")
+ .setEventTime(System.currentTimeMillis())
+ .setFlowFileUUID("uuid-1")
+ .setComponentType("Unit Test")
+ .setCurrentContentClaim(null, null, null, null, fileSize);
+
+ switch (eventType) {
+ case RECEIVE:
+ case SEND:
+ case FETCH:
+ builder.setTransitUri("http://localhost/test");
+ break;
+ case ROUTE:
+ builder.setRelationship("success");
+ break;
+ default:
+ break;
+ }
+
+ return builder.build();
+ }
+
private FlowFileRecord createFlowFile() {
final ResourceClaim resourceClaim = new
StandardResourceClaim(resourceClaimManager, "container", "section", "1", false);
final ContentClaim contentClaim = new
StandardContentClaim(resourceClaim, 0L);
diff --git
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
index 758b153371..66ccfb0c91 100644
---
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
+++
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
@@ -111,4 +111,13 @@ public interface StatelessDataflow {
OptionalLong getCounter(String componentId, String counterName);
Map<String, Long> getCounters(Pattern counterNamePattern);
+
+ /**
+ * Returns the latest time at which any component in the dataflow had
activity, or an empty OptionalLong if there has been no activity.
+ *
+ * @return the latest activity time in milliseconds since epoch, or empty
if no activity has occurred
+ */
+ default OptionalLong getLatestActivityTime() {
+ return OptionalLong.empty();
+ }
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 248683bdad..b55bf4f06e 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -965,6 +965,11 @@ public class StandardStatelessFlow implements
StatelessDataflow {
.collect(Collectors.toMap(Counter::getName, Counter::getValue));
}
+ @Override
+ public OptionalLong getLatestActivityTime() {
+ return rootGroup.getFlowFileActivity().getLatestActivityTime();
+ }
+
private String findInstanceId(final String componentId) {
return rootGroup.findAllProcessors().stream()
.filter(processor -> Objects.equals(processor.getIdentifier(),
componentId) || Objects.equals(processor.getVersionedComponentId().orElse(""),
componentId))