This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 65ba4a2  NIFI-6853 - flowFileEvent combineCounters hashmap overwritten 
- Added unit test to verify behavior of contribution
65ba4a2 is described below

commit 65ba4a2d9353b8a2038e45b1079739bf016d7b56
Author: Iván Rodriguez <[email protected]>
AuthorDate: Wed Nov 6 23:33:13 2019 -0300

    NIFI-6853 - flowFileEvent combineCounters hashmap overwritten
    - Added unit test to verify behavior of contribution
    
    This closes #3875.
    
    Signed-off-by: Mark Payne <[email protected]>
---
 .../repository/StandardProcessSession.java         | 10 ++++---
 .../repository/TestStandardProcessSession.java     | 31 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 3 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index c758c3d..c930b36 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -564,7 +564,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         final Map<String, Long> combined = new HashMap<>();
         combined.putAll(first);
-        combined.putAll(second);
+        second.forEach((key, value) -> combined.merge(key, value, Long::sum));
         return combined;
     }
 
@@ -3399,12 +3399,16 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 if (this.countersOnCommit.isEmpty()) {
                     this.countersOnCommit.putAll(session.countersOnCommit);
                 } else {
-                    session.countersOnCommit.forEach((key, value) -> 
this.countersOnCommit.merge(key, value, (v1, v2) -> v1 + v2));
+                    session.countersOnCommit.forEach((key, value) -> 
this.countersOnCommit.merge(key, value, Long::sum));
                 }
             }
 
             if (session.immediateCounters != null) {
-                this.immediateCounters.putAll(session.immediateCounters);
+                if (this.immediateCounters.isEmpty()) {
+                    this.immediateCounters.putAll(session.immediateCounters);
+                } else {
+                    session.immediateCounters.forEach((key, value) -> 
this.immediateCounters.merge(key, value, Long::sum));
+                }
             }
 
             this.deleteOnCommit.putAll(session.deleteOnCommit);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 9675613..445a48a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -343,6 +343,37 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testCombineCounters() {
+        final Relationship relationship = new 
Relationship.Builder().name("A").build();
+
+        FlowFile flowFile = session.create();
+        session.transfer(flowFile, relationship);
+        session.adjustCounter("a", 1, false);
+        session.adjustCounter("b", 3, false);
+        session.adjustCounter("a", 3, true);
+        session.adjustCounter("b", 5, true);
+        session.checkpoint();
+
+        flowFile = session.create();
+        session.transfer(flowFile, relationship);
+        session.adjustCounter("a", 1, true);
+        session.adjustCounter("b", 2, true);
+        session.commit();
+
+        
context.getFlowFileEventRepository().reportTransferEvents(10L).getReportEntries().forEach((k,
 v) -> {
+            v.getCounters().forEach((key, value) -> {
+                if (key.equals("a")) {
+                    assertEquals(5L, (long) value);
+                }
+
+                if (key.equals("b")) {
+                    assertEquals(10L, (long) value);
+                }
+            });
+        });
+    }
+
+    @Test
     public void testReadCountCorrectWhenSkippingWithReadCallback() throws 
IOException {
         final byte[] content = "This and that and the 
other.".getBytes(StandardCharsets.UTF_8);
 

Reply via email to