This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 65ba4a2 NIFI-6853 - flowFileEvent combineCounters hashmap overwritten
- Added unit test to verify behavior of contribution
65ba4a2 is described below
commit 65ba4a2d9353b8a2038e45b1079739bf016d7b56
Author: Iván Rodriguez <[email protected]>
AuthorDate: Wed Nov 6 23:33:13 2019 -0300
NIFI-6853 - flowFileEvent combineCounters hashmap overwritten
- Added unit test to verify behavior of contribution
This closes #3875.
Signed-off-by: Mark Payne <[email protected]>
---
.../repository/StandardProcessSession.java | 10 ++++---
.../repository/TestStandardProcessSession.java | 31 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 3 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index c758c3d..c930b36 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -564,7 +564,7 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
final Map<String, Long> combined = new HashMap<>();
combined.putAll(first);
- combined.putAll(second);
+ second.forEach((key, value) -> combined.merge(key, value, Long::sum));
return combined;
}
@@ -3399,12 +3399,16 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
if (this.countersOnCommit.isEmpty()) {
this.countersOnCommit.putAll(session.countersOnCommit);
} else {
- session.countersOnCommit.forEach((key, value) ->
this.countersOnCommit.merge(key, value, (v1, v2) -> v1 + v2));
+ session.countersOnCommit.forEach((key, value) ->
this.countersOnCommit.merge(key, value, Long::sum));
}
}
if (session.immediateCounters != null) {
- this.immediateCounters.putAll(session.immediateCounters);
+ if (this.immediateCounters.isEmpty()) {
+ this.immediateCounters.putAll(session.immediateCounters);
+ } else {
+ session.immediateCounters.forEach((key, value) ->
this.immediateCounters.merge(key, value, Long::sum));
+ }
}
this.deleteOnCommit.putAll(session.deleteOnCommit);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 9675613..445a48a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -343,6 +343,37 @@ public class TestStandardProcessSession {
}
@Test
+ public void testCombineCounters() {
+ final Relationship relationship = new
Relationship.Builder().name("A").build();
+
+ FlowFile flowFile = session.create();
+ session.transfer(flowFile, relationship);
+ session.adjustCounter("a", 1, false);
+ session.adjustCounter("b", 3, false);
+ session.adjustCounter("a", 3, true);
+ session.adjustCounter("b", 5, true);
+ session.checkpoint();
+
+ flowFile = session.create();
+ session.transfer(flowFile, relationship);
+ session.adjustCounter("a", 1, true);
+ session.adjustCounter("b", 2, true);
+ session.commit();
+
+
context.getFlowFileEventRepository().reportTransferEvents(10L).getReportEntries().forEach((k,
v) -> {
+ v.getCounters().forEach((key, value) -> {
+ if (key.equals("a")) {
+ assertEquals(5L, (long) value);
+ }
+
+ if (key.equals("b")) {
+ assertEquals(10L, (long) value);
+ }
+ });
+ });
+ }
+
+ @Test
public void testReadCountCorrectWhenSkippingWithReadCallback() throws
IOException {
final byte[] content = "This and that and the
other.".getBytes(StandardCharsets.UTF_8);