Repository: apex-malhar
Updated Branches:
  refs/heads/master c3d3a880d -> 2b775968a


APEXMALHAR-2309 Comparing times for newer tuples with existing key in dedup


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2b775968
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2b775968
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2b775968

Branch: refs/heads/master
Commit: 2b775968a40853a10c487176660c813de8d36acb
Parents: c3d3a88
Author: francisf <[email protected]>
Authored: Fri Oct 21 18:38:39 2016 +0530
Committer: francisf <[email protected]>
Committed: Wed Oct 26 12:33:15 2016 +0530

----------------------------------------------------------------------
 .../apex/malhar/lib/dedup/AbstractDeduper.java  |  3 +-
 .../lib/dedup/DeduperTimeBasedPOJOImplTest.java | 44 ++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b775968/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java 
b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
index 0512ebd..c73cea7 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
@@ -311,7 +311,8 @@ public abstract class AbstractDeduper<T>
         Future<Slice> future = waitingEvent.getValue();
         if (future.isDone() || finalize ) {
           try {
-            if (future.get() == null && asyncEvents.get(tupleKey) == null) {
+            Long asyncEventsTupleTime = asyncEvents.get(tupleKey);
+            if (future.get() == null && (asyncEventsTupleTime == null || 
asyncEventsTupleTime < tupleTime) ) {
               putManagedState(tuple);
               asyncEvents.put(tupleKey, tupleTime);
               processUnique(tuple);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b775968/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java
index 4b25341..f73413b 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java
@@ -102,6 +102,50 @@ public class DeduperTimeBasedPOJOImplTest
     deduper.teardown();
   }
 
+  @Test
+  public void testDedupDifferentWindowSameKey()
+  {
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes =
+        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_ID, APP_ID);
+    attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, TestPojo.class);
+    OperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributes);
+    deduper.setup(context);
+    deduper.input.setup(new PortContext(attributes, context));
+    deduper.activate(context);
+    CollectorTestSink<TestPojo> uniqueSink = new CollectorTestSink<TestPojo>();
+    TestUtils.setSink(deduper.unique, uniqueSink);
+    CollectorTestSink<TestPojo> duplicateSink = new 
CollectorTestSink<TestPojo>();
+    TestUtils.setSink(deduper.duplicate, duplicateSink);
+    CollectorTestSink<TestPojo> expiredSink = new 
CollectorTestSink<TestPojo>();
+    TestUtils.setSink(deduper.expired, expiredSink);
+
+    deduper.beginWindow(0);
+
+    long millis = System.currentTimeMillis();
+    deduper.input.process( new TestPojo(10, new Date(millis)));
+    deduper.input.process( new TestPojo(11, new Date(millis + 10000)));
+    deduper.input.process( new TestPojo(12, new Date(millis + 20000)));
+    deduper.input.process( new TestPojo(13, new Date(millis + 30000)));
+    deduper.input.process( new TestPojo(14, new Date(millis + 40000)));
+    deduper.input.process( new TestPojo(15, new Date(millis + 50000)));
+    deduper.input.process( new TestPojo(10, new Date(millis))); //Duplicate
+    deduper.input.process( new TestPojo(16, new Date(millis + 60000)));
+    deduper.input.process( new TestPojo(10, new Date(millis + 70000))); // New 
tuple with same key but outside expired window.
+    deduper.input.process( new TestPojo(10, new Date(millis))); // Earlier 
tuple with earlier time -- Expired
+    deduper.input.process( new TestPojo(10, new Date(millis + 70000))); // New 
tuple repeated again - Duplicate
+
+    deduper.handleIdleTime();
+    deduper.endWindow();
+
+    Assert.assertTrue(uniqueSink.collectedTuples.size() == 8);
+    Assert.assertTrue(duplicateSink.collectedTuples.size() == 2);
+    Assert.assertTrue(expiredSink.collectedTuples.size() == 1);
+
+    deduper.teardown();
+  }
+
   @After
   public void teardown()
   {

Reply via email to