Repository: apex-malhar
Updated Branches:
  refs/heads/master 2291d9a0d -> 8e39f8236


APEXMALHAR-2139 #resolve #comment Emit only the keys for which values changed 
or were added in a given window.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/76f297f2
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/76f297f2
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/76f297f2

Branch: refs/heads/master
Commit: 76f297f283bd5e24cd9ec7516f0363420378ec09
Parents: d713e52
Author: Aayushi Acharya <[email protected]>
Authored: Mon Jul 11 15:55:57 2016 -0700
Committer: aayushi <[email protected]>
Committed: Wed Sep 21 23:19:04 2016 -0400

----------------------------------------------------------------------
 .../com/datatorrent/lib/algo/UniqueCounter.java | 25 +++++-------
 .../datatorrent/lib/algo/UniqueCounterTest.java | 40 ++++++++++++--------
 2 files changed, 35 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/76f297f2/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java 
b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
index 013c8bc..90b90a2 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
@@ -19,9 +19,7 @@
 package com.datatorrent.lib.algo;
 
 import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.mutable.MutableInt;
+import java.util.HashSet;
 
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
@@ -32,7 +30,7 @@ import com.datatorrent.lib.util.UnifierHashMapSumKeys;
 /**
  * This operator counts the number of times a tuple exists in a window.&nbsp;A 
map from tuples to counts is emitted at the end of each window.
  * <p>
- * Counts the number of times a key exists in a window; Count is emitted at 
end of window in a single HashMap.
+ * Counts the number of times a key exists or is added in that given window; 
Count is emitted for the modified or added keys at end of window in a single 
HashMap.
  * </p>
  * <p>
  * This is an end of window operator<br>
@@ -58,6 +56,7 @@ import com.datatorrent.lib.util.UnifierHashMapSumKeys;
 public class UniqueCounter<K> extends BaseUniqueKeyCounter<K>
 {
   private boolean cumulative;
+  HashSet<K> inputSet = new HashSet<>();
 
   /**
    * The input port which receives incoming tuples.
@@ -70,9 +69,9 @@ public class UniqueCounter<K> extends BaseUniqueKeyCounter<K>
     @Override
     public void process(K tuple)
     {
+      inputSet.add(tuple);
       processTuple(tuple);
     }
-
   };
 
   /**
@@ -90,24 +89,20 @@ public class UniqueCounter<K> extends 
BaseUniqueKeyCounter<K>
   };
 
   /**
-   * Emits one HashMap as tuple
+   * Emits only the keys and values changed or added in a given window.
    */
   @Override
   public void endWindow()
   {
-    HashMap<K, Integer> tuple = null;
-    for (Map.Entry<K, MutableInt> e: map.entrySet()) {
-      if (tuple == null) {
-        tuple = new HashMap<K, Integer>();
-      }
-      tuple.put(e.getKey(), e.getValue().toInteger());
-    }
-    if (tuple != null) {
-      count.emit(tuple);
+    HashMap<K, Integer> tuple = new HashMap<>();
+    for (K key: inputSet) {
+      tuple.put(key, map.get(key).toInteger());
     }
+    count.emit(tuple);
     if (!cumulative) {
       map.clear();
     }
+    inputSet.clear();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/76f297f2/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java 
b/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java
index d1fbc02..2502538 100644
--- a/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java
@@ -48,7 +48,6 @@ public class UniqueCounterTest
     String btuple = "b";
     String ctuple = "c";
     String dtuple = "d";
-    String etuple = "e";
 
     int numTuples = 10000;
     oper.beginWindow(0);
@@ -57,28 +56,39 @@ public class UniqueCounterTest
       if (i % 2 == 0) {
         oper.data.process(btuple);
       }
-      if (i % 3 == 0) {
+      if (i % 5 == 0) {
         oper.data.process(ctuple);
       }
-      if (i % 5 == 0) {
-        oper.data.process(dtuple);
+    }
+    oper.endWindow();
+
+    oper.beginWindow(1);
+    for (int i = 0; i < numTuples; i++) {
+      oper.data.process(atuple);
+    }
+    oper.endWindow();
+
+    HashMap<String, Integer> tuple = (HashMap<String, Integer>)sink.tuple;
+    int acount = tuple.get("a");
+    Assert.assertEquals("number emitted tuples", numTuples, acount);
+
+    oper.beginWindow(2);
+    for (int i = 0; i < numTuples; i++) {
+      if (i % 2 == 0) {
+        oper.data.process(btuple);
       }
+      oper.data.process(btuple);
       if (i % 10 == 0) {
-        oper.data.process(etuple);
+        oper.data.process(dtuple);
       }
     }
     oper.endWindow();
-    HashMap<String, Integer> tuple = (HashMap<String, Integer>)sink.tuple;
-    int acount = tuple.get("a");
+
+    tuple = (HashMap<String, Integer>)sink.tuple;
     int bcount = tuple.get("b");
-    int ccount = tuple.get("c");
     int dcount = tuple.get("d");
-    int ecount = tuple.get("e");
-    Assert.assertEquals("number emitted tuples", 1, sink.count);
-    Assert.assertEquals("number emitted tuples", numTuples, acount);
-    Assert.assertEquals("number emitted tuples", numTuples / 2, bcount);
-    Assert.assertEquals("number emitted tuples", numTuples / 3 + 1, ccount);
-    Assert.assertEquals("number emitted tuples", numTuples / 5, dcount);
-    Assert.assertEquals("number emitted tuples", numTuples / 10, ecount);
+    Assert.assertEquals("number emitted tuples", 3, sink.count);
+    Assert.assertEquals("number emitted tuples", numTuples + (numTuples / 2), 
bcount);
+    Assert.assertEquals("number emitted tuples", numTuples / 10, dcount);
   }
 }

Reply via email to