This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 43f7845dc0f NIFI-15624 Fixed recording Gauges on Session Committed
43f7845dc0f is described below

commit 43f7845dc0f25da1c36ca80dacb04fb75603792b
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Feb 18 21:23:50 2026 -0600

    NIFI-15624 Fixed recording Gauges on Session Committed
    
    - Added handling for copying checkpointed records
    - Added unit tests for record gauge method
    
    This closes #10918.
    
    Signed-off-by: Pierre Villard <[email protected]>
---
 .../repository/StandardProcessSession.java         |  9 +++++-
 .../repository/StandardProcessSessionTest.java     | 37 +++++++++++++++++++++-
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 6701454051b..5a996c8862f 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -301,7 +301,10 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
             this.checkpoint = new Checkpoint();
         }
 
-        if (records.isEmpty() && (countersOnCommit == null || 
countersOnCommit.isEmpty())) {
+        if (records.isEmpty()
+                && (countersOnCommit == null || countersOnCommit.isEmpty())
+                && (gaugeRecordsSessionCommitted == null || 
gaugeRecordsSessionCommitted.isEmpty())
+        ) {
             LOG.trace("{} checkpointed, but no events were performed by this 
ProcessSession", this);
             checkpoint.checkpoint(this, Collections.emptyList(), 
copyCollections);
             return;
@@ -4021,6 +4024,10 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
             mergeMaps(this.countersOnCommit, session.countersOnCommit, 
Long::sum);
             mergeMaps(this.immediateCounters, session.immediateCounters, 
Long::sum);
 
+            if (session.gaugeRecordsSessionCommitted != null) {
+                
this.gaugeRecordsSessionCommitted.addAll(session.gaugeRecordsSessionCommitted);
+            }
+
             this.deleteOnCommit.putAll(session.deleteOnCommit);
             this.removedFlowFiles.addAll(session.removedFlowFiles);
             this.createdFlowFiles.addAll(session.createdFlowFiles);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionTest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionTest.java
index d700b941113..6c2171c3c86 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionTest.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionTest.java
@@ -18,11 +18,13 @@ package org.apache.nifi.controller.repository;
 
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.lifecycle.TaskTermination;
+import org.apache.nifi.controller.metrics.GaugeRecord;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
 import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.metrics.CommitTiming;
 import org.apache.nifi.provenance.InternalProvenanceReporter;
 import org.apache.nifi.provenance.ProvenanceRepository;
 import org.junit.jupiter.api.BeforeEach;
@@ -63,6 +65,10 @@ class StandardProcessSessionTest {
 
     private static final long BYTES_WRITTEN = CONTENT.length;
 
+    private static final String GAUGE_NAME = "freeMemory";
+
+    private static final double GAUGE_VALUE = 64.5;
+
     @Mock
     RepositoryContext repositoryContext;
 
@@ -99,6 +105,9 @@ class StandardProcessSessionTest {
     @Captor
     ArgumentCaptor<FlowFileEvent> flowFileEventCaptor;
 
+    @Captor
+    ArgumentCaptor<GaugeRecord> gaugeRecordCaptor;
+
     StandardProcessSession session;
 
     @BeforeEach
@@ -114,6 +123,7 @@ class StandardProcessSessionTest {
     @Test
     void testExportToPathFlowFileEventBytes() throws IOException {
         setRepositoryContext();
+        
when(repositoryContext.getContentRepository()).thenReturn(contentRepository);
 
         final Path destination = getDestination();
         when(contentRepository.exportTo(isNull(), eq(destination), 
eq(APPEND_DISABLED), anyLong(), anyLong())).thenReturn(EXPECTED_BYTES);
@@ -129,6 +139,7 @@ class StandardProcessSessionTest {
     @Test
     void testExportToOutputStreamFlowFileEventBytes() throws IOException {
         setRepositoryContext();
+        
when(repositoryContext.getContentRepository()).thenReturn(contentRepository);
 
         FlowFile flowFile = session.create();
 
@@ -148,6 +159,31 @@ class StandardProcessSessionTest {
         assertFlowFileEventMatched(BYTES_READ, BYTES_WRITTEN);
     }
 
+    @Test
+    void testRecordGaugeNow() {
+        session.recordGauge(GAUGE_NAME, GAUGE_VALUE, CommitTiming.NOW);
+
+        verify(repositoryContext).recordGauge(gaugeRecordCaptor.capture());
+        final GaugeRecord gaugeRecord = gaugeRecordCaptor.getValue();
+
+        assertEquals(GAUGE_NAME, gaugeRecord.name());
+        assertEquals(GAUGE_VALUE, gaugeRecord.value());
+    }
+
+    @Test
+    void testRecordGaugeSessionCommitted() {
+        session.recordGauge(GAUGE_NAME, GAUGE_VALUE, 
CommitTiming.SESSION_COMMITTED);
+
+        setRepositoryContext();
+        session.commit();
+
+        verify(repositoryContext).recordGauge(gaugeRecordCaptor.capture());
+        final GaugeRecord gaugeRecord = gaugeRecordCaptor.getValue();
+
+        assertEquals(GAUGE_NAME, gaugeRecord.name());
+        assertEquals(GAUGE_VALUE, gaugeRecord.value());
+    }
+
     private void assertFlowFileEventMatched(final long bytesRead, final long 
bytesWritten) throws IOException {
         
verify(flowFileEventRepository).updateRepository(flowFileEventCaptor.capture(), 
anyString());
         final FlowFileEvent flowFileEvent = flowFileEventCaptor.getValue();
@@ -157,7 +193,6 @@ class StandardProcessSessionTest {
     }
 
     private void setRepositoryContext() {
-        
when(repositoryContext.getContentRepository()).thenReturn(contentRepository);
         
when(repositoryContext.getProvenanceRepository()).thenReturn(provenanceRepository);
         
when(repositoryContext.getFlowFileRepository()).thenReturn(flowFileRepository);
         
when(repositoryContext.getFlowFileEventRepository()).thenReturn(flowFileEventRepository);

Reply via email to