Repository: apex-malhar Updated Branches: refs/heads/master c3d3a880d -> 2b775968a
APEXMALHAR-2309 Comparing times for newer tuples with existing key in dedup Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2b775968 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2b775968 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2b775968 Branch: refs/heads/master Commit: 2b775968a40853a10c487176660c813de8d36acb Parents: c3d3a88 Author: francisf <[email protected]> Authored: Fri Oct 21 18:38:39 2016 +0530 Committer: francisf <[email protected]> Committed: Wed Oct 26 12:33:15 2016 +0530 ---------------------------------------------------------------------- .../apex/malhar/lib/dedup/AbstractDeduper.java | 3 +- .../lib/dedup/DeduperTimeBasedPOJOImplTest.java | 44 ++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b775968/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java index 0512ebd..c73cea7 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java @@ -311,7 +311,8 @@ public abstract class AbstractDeduper<T> Future<Slice> future = waitingEvent.getValue(); if (future.isDone() || finalize ) { try { - if (future.get() == null && asyncEvents.get(tupleKey) == null) { + Long asyncEventsTupleTime = asyncEvents.get(tupleKey); + if (future.get() == null && (asyncEventsTupleTime == null || asyncEventsTupleTime < tupleTime) ) { putManagedState(tuple); asyncEvents.put(tupleKey, tupleTime); processUnique(tuple); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b775968/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java index 4b25341..f73413b 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java @@ -102,6 +102,50 @@ public class DeduperTimeBasedPOJOImplTest deduper.teardown(); } + @Test + public void testDedupDifferentWindowSameKey() + { + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes = + new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_ID, APP_ID); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, TestPojo.class); + OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributes); + deduper.setup(context); + deduper.input.setup(new PortContext(attributes, context)); + deduper.activate(context); + CollectorTestSink<TestPojo> uniqueSink = new CollectorTestSink<TestPojo>(); + TestUtils.setSink(deduper.unique, uniqueSink); + CollectorTestSink<TestPojo> duplicateSink = new CollectorTestSink<TestPojo>(); + TestUtils.setSink(deduper.duplicate, duplicateSink); + CollectorTestSink<TestPojo> expiredSink = new CollectorTestSink<TestPojo>(); + TestUtils.setSink(deduper.expired, expiredSink); + + deduper.beginWindow(0); + + long millis = System.currentTimeMillis(); + deduper.input.process( new TestPojo(10, new Date(millis))); + deduper.input.process( new TestPojo(11, new Date(millis + 10000))); + deduper.input.process( new TestPojo(12, new Date(millis + 20000))); + deduper.input.process( new TestPojo(13, new Date(millis + 30000))); + deduper.input.process( new TestPojo(14, new Date(millis + 40000))); + deduper.input.process( new TestPojo(15, new Date(millis + 50000))); + deduper.input.process( new TestPojo(10, new Date(millis))); //Duplicate + deduper.input.process( new TestPojo(16, new Date(millis + 60000))); + deduper.input.process( new TestPojo(10, new Date(millis + 70000))); // New tuple with same key but outside expired window. + deduper.input.process( new TestPojo(10, new Date(millis))); // Earlier tuple with earlier time -- Expired + deduper.input.process( new TestPojo(10, new Date(millis + 70000))); // New tuple repeated again - Duplicate + + deduper.handleIdleTime(); + deduper.endWindow(); + + Assert.assertTrue(uniqueSink.collectedTuples.size() == 8); + Assert.assertTrue(duplicateSink.collectedTuples.size() == 2); + Assert.assertTrue(expiredSink.collectedTuples.size() == 1); + + deduper.teardown(); + } + @After public void teardown() {
