This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 43f7845dc0f NIFI-15624 Fixed recording Gauges on Session Committed
43f7845dc0f is described below
commit 43f7845dc0f25da1c36ca80dacb04fb75603792b
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Feb 18 21:23:50 2026 -0600
NIFI-15624 Fixed recording Gauges on Session Committed
- Added handling for copying checkpointed records
- Added unit tests for record gauge method
This closes #10918.
Signed-off-by: Pierre Villard <[email protected]>
---
.../repository/StandardProcessSession.java | 9 +++++-
.../repository/StandardProcessSessionTest.java | 37 +++++++++++++++++++++-
2 files changed, 44 insertions(+), 2 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 6701454051b..5a996c8862f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -301,7 +301,10 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
this.checkpoint = new Checkpoint();
}
- if (records.isEmpty() && (countersOnCommit == null ||
countersOnCommit.isEmpty())) {
+ if (records.isEmpty()
+ && (countersOnCommit == null || countersOnCommit.isEmpty())
+ && (gaugeRecordsSessionCommitted == null ||
gaugeRecordsSessionCommitted.isEmpty())
+ ) {
LOG.trace("{} checkpointed, but no events were performed by this
ProcessSession", this);
checkpoint.checkpoint(this, Collections.emptyList(),
copyCollections);
return;
@@ -4021,6 +4024,10 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
mergeMaps(this.countersOnCommit, session.countersOnCommit,
Long::sum);
mergeMaps(this.immediateCounters, session.immediateCounters,
Long::sum);
+ if (session.gaugeRecordsSessionCommitted != null) {
+
this.gaugeRecordsSessionCommitted.addAll(session.gaugeRecordsSessionCommitted);
+ }
+
this.deleteOnCommit.putAll(session.deleteOnCommit);
this.removedFlowFiles.addAll(session.removedFlowFiles);
this.createdFlowFiles.addAll(session.createdFlowFiles);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionTest.java
index d700b941113..6c2171c3c86 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionTest.java
@@ -18,11 +18,13 @@ package org.apache.nifi.controller.repository;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.lifecycle.TaskTermination;
+import org.apache.nifi.controller.metrics.GaugeRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.metrics.CommitTiming;
import org.apache.nifi.provenance.InternalProvenanceReporter;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.junit.jupiter.api.BeforeEach;
@@ -63,6 +65,10 @@ class StandardProcessSessionTest {
private static final long BYTES_WRITTEN = CONTENT.length;
+ private static final String GAUGE_NAME = "freeMemory";
+
+ private static final double GAUGE_VALUE = 64.5;
+
@Mock
RepositoryContext repositoryContext;
@@ -99,6 +105,9 @@ class StandardProcessSessionTest {
@Captor
ArgumentCaptor<FlowFileEvent> flowFileEventCaptor;
+ @Captor
+ ArgumentCaptor<GaugeRecord> gaugeRecordCaptor;
+
StandardProcessSession session;
@BeforeEach
@@ -114,6 +123,7 @@ class StandardProcessSessionTest {
@Test
void testExportToPathFlowFileEventBytes() throws IOException {
setRepositoryContext();
+
when(repositoryContext.getContentRepository()).thenReturn(contentRepository);
final Path destination = getDestination();
when(contentRepository.exportTo(isNull(), eq(destination),
eq(APPEND_DISABLED), anyLong(), anyLong())).thenReturn(EXPECTED_BYTES);
@@ -129,6 +139,7 @@ class StandardProcessSessionTest {
@Test
void testExportToOutputStreamFlowFileEventBytes() throws IOException {
setRepositoryContext();
+
when(repositoryContext.getContentRepository()).thenReturn(contentRepository);
FlowFile flowFile = session.create();
@@ -148,6 +159,31 @@ class StandardProcessSessionTest {
assertFlowFileEventMatched(BYTES_READ, BYTES_WRITTEN);
}
+ @Test
+ void testRecordGaugeNow() {
+ session.recordGauge(GAUGE_NAME, GAUGE_VALUE, CommitTiming.NOW);
+
+ verify(repositoryContext).recordGauge(gaugeRecordCaptor.capture());
+ final GaugeRecord gaugeRecord = gaugeRecordCaptor.getValue();
+
+ assertEquals(GAUGE_NAME, gaugeRecord.name());
+ assertEquals(GAUGE_VALUE, gaugeRecord.value());
+ }
+
+ @Test
+ void testRecordGaugeSessionCommitted() {
+ session.recordGauge(GAUGE_NAME, GAUGE_VALUE,
CommitTiming.SESSION_COMMITTED);
+
+ setRepositoryContext();
+ session.commit();
+
+ verify(repositoryContext).recordGauge(gaugeRecordCaptor.capture());
+ final GaugeRecord gaugeRecord = gaugeRecordCaptor.getValue();
+
+ assertEquals(GAUGE_NAME, gaugeRecord.name());
+ assertEquals(GAUGE_VALUE, gaugeRecord.value());
+ }
+
private void assertFlowFileEventMatched(final long bytesRead, final long
bytesWritten) throws IOException {
verify(flowFileEventRepository).updateRepository(flowFileEventCaptor.capture(),
anyString());
final FlowFileEvent flowFileEvent = flowFileEventCaptor.getValue();
@@ -157,7 +193,6 @@ class StandardProcessSessionTest {
}
private void setRepositoryContext() {
-
when(repositoryContext.getContentRepository()).thenReturn(contentRepository);
when(repositoryContext.getProvenanceRepository()).thenReturn(provenanceRepository);
when(repositoryContext.getFlowFileRepository()).thenReturn(flowFileRepository);
when(repositoryContext.getFlowFileEventRepository()).thenReturn(flowFileEventRepository);