Repository: apex-malhar Updated Branches: refs/heads/master 2291d9a0d -> 8e39f8236
APEXMALHAR-2139 #resolve #comment Emit only the keys for which values changed or were added in a given window. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/76f297f2 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/76f297f2 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/76f297f2 Branch: refs/heads/master Commit: 76f297f283bd5e24cd9ec7516f0363420378ec09 Parents: d713e52 Author: Aayushi Acharya <[email protected]> Authored: Mon Jul 11 15:55:57 2016 -0700 Committer: aayushi <[email protected]> Committed: Wed Sep 21 23:19:04 2016 -0400 ---------------------------------------------------------------------- .../com/datatorrent/lib/algo/UniqueCounter.java | 25 +++++------- .../datatorrent/lib/algo/UniqueCounterTest.java | 40 ++++++++++++-------- 2 files changed, 35 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/76f297f2/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java index 013c8bc..90b90a2 100644 --- a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java +++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java @@ -19,9 +19,7 @@ package com.datatorrent.lib.algo; import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.lang.mutable.MutableInt; +import java.util.HashSet; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; @@ -32,7 +30,7 @@ import com.datatorrent.lib.util.UnifierHashMapSumKeys; /** * This operator counts the number of times a tuple exists in a window. A map from tuples to counts is emitted at the end of each window. * <p> - * Counts the number of times a key exists in a window; Count is emitted at end of window in a single HashMap. + * Counts the number of times a key exists or is added in that given window; Count is emitted for the modified or added keys at end of window in a single HashMap. * </p> * <p> * This is an end of window operator<br> @@ -58,6 +56,7 @@ import com.datatorrent.lib.util.UnifierHashMapSumKeys; public class UniqueCounter<K> extends BaseUniqueKeyCounter<K> { private boolean cumulative; + HashSet<K> inputSet = new HashSet<>(); /** * The input port which receives incoming tuples. @@ -70,9 +69,9 @@ public class UniqueCounter<K> extends BaseUniqueKeyCounter<K> @Override public void process(K tuple) { + inputSet.add(tuple); processTuple(tuple); } - }; /** @@ -90,24 +89,20 @@ public class UniqueCounter<K> extends BaseUniqueKeyCounter<K> }; /** - * Emits one HashMap as tuple + * Emits only the keys and values changed or added in a given window. */ @Override public void endWindow() { - HashMap<K, Integer> tuple = null; - for (Map.Entry<K, MutableInt> e: map.entrySet()) { - if (tuple == null) { - tuple = new HashMap<K, Integer>(); - } - tuple.put(e.getKey(), e.getValue().toInteger()); - } - if (tuple != null) { - count.emit(tuple); + HashMap<K, Integer> tuple = new HashMap<>(); + for (K key: inputSet) { + tuple.put(key, map.get(key).toInteger()); } + count.emit(tuple); if (!cumulative) { map.clear(); } + inputSet.clear(); } /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/76f297f2/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java b/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java index d1fbc02..2502538 100644 --- a/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java +++ b/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java @@ -48,7 +48,6 @@ public class UniqueCounterTest String btuple = "b"; String ctuple = "c"; String dtuple = "d"; - String etuple = "e"; int numTuples = 10000; oper.beginWindow(0); @@ -57,28 +56,39 @@ public class UniqueCounterTest if (i % 2 == 0) { oper.data.process(btuple); } - if (i % 3 == 0) { + if (i % 5 == 0) { oper.data.process(ctuple); } - if (i % 5 == 0) { - oper.data.process(dtuple); + } + oper.endWindow(); + + oper.beginWindow(1); + for (int i = 0; i < numTuples; i++) { + oper.data.process(atuple); + } + oper.endWindow(); + + HashMap<String, Integer> tuple = (HashMap<String, Integer>)sink.tuple; + int acount = tuple.get("a"); + Assert.assertEquals("number emitted tuples", numTuples, acount); + + oper.beginWindow(2); + for (int i = 0; i < numTuples; i++) { + if (i % 2 == 0) { + oper.data.process(btuple); } + oper.data.process(btuple); if (i % 10 == 0) { - oper.data.process(etuple); + oper.data.process(dtuple); } } oper.endWindow(); - HashMap<String, Integer> tuple = (HashMap<String, Integer>)sink.tuple; - int acount = tuple.get("a"); + + tuple = (HashMap<String, Integer>)sink.tuple; int bcount = tuple.get("b"); - int ccount = tuple.get("c"); int dcount = tuple.get("d"); - int ecount = tuple.get("e"); - Assert.assertEquals("number emitted tuples", 1, sink.count); - Assert.assertEquals("number emitted tuples", numTuples, acount); - Assert.assertEquals("number emitted tuples", numTuples / 2, bcount); - Assert.assertEquals("number emitted tuples", numTuples / 3 + 1, ccount); - Assert.assertEquals("number emitted tuples", numTuples / 5, dcount); - Assert.assertEquals("number emitted tuples", numTuples / 10, ecount); + Assert.assertEquals("number emitted tuples", 3, sink.count); + Assert.assertEquals("number emitted tuples", numTuples + (numTuples / 2), bcount); + Assert.assertEquals("number emitted tuples", numTuples / 10, dcount); } }
