Repository: nifi Updated Branches: refs/heads/master a29b7b3bf -> 5061e5fa0
NIFI-1222: Session.adjustCounter keeps track of local and global counters; it then call processContext.adjustCounter with each of them, but ProcessContext was changed a while back to automatically increment both 'lcoal' and 'global' counters each time, so our numbers are doubled; removed the 'localCounters' and 'globalCounters' from StandardProcessSession and replaced with just 'counters' Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5061e5fa Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5061e5fa Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5061e5fa Branch: refs/heads/master Commit: 5061e5fa0a5625d1a7605a83795c5ac4cf1c5938 Parents: a29b7b3 Author: Mark Payne <[email protected]> Authored: Wed Nov 25 16:11:11 2015 -0500 Committer: joewitt <[email protected]> Committed: Wed Nov 25 16:26:59 2015 -0500 ---------------------------------------------------------------------- .../repository/StandardProcessSession.java | 21 ++++++-------------- 1 file changed, 6 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/5061e5fa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index d447ddd..6ad65c1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -103,8 +103,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>(); private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>(); private final Map<Connection, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>(); - private final Map<String, Long> localCounters = new HashMap<>(); - private final Map<String, Long> globalCounters = new HashMap<>(); + private final Map<String, Long> counters = new HashMap<>(); private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>(); private final ProcessContext context; private final Set<FlowFile> recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring @@ -397,11 +396,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } - for (final Map.Entry<String, Long> entry : checkpoint.localCounters.entrySet()) { - adjustCounter(entry.getKey(), entry.getValue(), true); - } - - for (final Map.Entry<String, Long> entry : checkpoint.globalCounters.entrySet()) { + for (final Map.Entry<String, Long> entry : checkpoint.counters.entrySet()) { adjustCounter(entry.getKey(), entry.getValue(), true); } @@ -993,8 +988,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE connectionCounts.clear(); createdFlowFiles.clear(); removedFlowFiles.clear(); - globalCounters.clear(); - localCounters.clear(); + counters.clear(); generatedProvenanceEvents.clear(); forkEventBuilders.clear(); @@ -1167,8 +1161,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return; } - adjustCounter(name, delta, localCounters); - adjustCounter(name, delta, globalCounters); + adjustCounter(name, delta, counters); } private void adjustCounter(final String name, final long delta, final Map<String, Long> map) { @@ -2554,8 +2547,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>(); private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>(); private final Map<Connection, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>(); - private final Map<String, Long> localCounters = new HashMap<>(); - private final Map<String, Long> globalCounters = new HashMap<>(); + private final Map<String, Long> counters = new HashMap<>(); private final Set<Path> deleteOnCommit = new HashSet<>(); private final Set<String> removedFlowFiles = new HashSet<>(); @@ -2581,8 +2573,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE this.records.putAll(session.records); this.connectionCounts.putAll(session.connectionCounts); this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles); - this.localCounters.putAll(session.localCounters); - this.globalCounters.putAll(session.globalCounters); + this.counters.putAll(session.counters); this.deleteOnCommit.addAll(session.deleteOnCommit); this.removedFlowFiles.addAll(session.removedFlowFiles);
