Repository: apex-malhar Updated Branches: refs/heads/master 1333910fd -> 9f04db7f6
APEXMALHAR-2273 #resolve do not fire retraction trigger if retraction value is same as current value and fireOnlyUpdatedPanes is true Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9f04db7f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9f04db7f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9f04db7f Branch: refs/heads/master Commit: 9f04db7f6d33d2bcc9ca610e130c5694b58416af Parents: 1333910 Author: David Yan <[email protected]> Authored: Thu Sep 29 16:35:55 2016 -0700 Committer: Siyuan Hua <[email protected]> Committed: Mon Oct 3 14:09:46 2016 -0700 ---------------------------------------------------------------------- .../window/impl/AbstractWindowedOperator.java | 5 +-- .../window/impl/KeyedWindowedOperatorImpl.java | 19 ++++++++--- .../lib/window/impl/WindowedOperatorImpl.java | 10 ++++-- .../malhar/lib/window/WindowedOperatorTest.java | 36 ++++++++++++++++---- 4 files changed, 54 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java index c778523..0ece11e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java @@ -526,7 +526,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext public void fireTrigger(Window window, WindowState windowState) { if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { - fireRetractionTrigger(window); + fireRetractionTrigger(window, triggerOption.isFiringOnlyUpdatedPanes()); } fireNormalTrigger(window, triggerOption.isFiringOnlyUpdatedPanes()); windowState.lastTriggerFiredTime = currentDerivedTimestamp; @@ -548,8 +548,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext * mode is ACCUMULATING_AND_RETRACTING * * @param window the window to fire the retraction trigger on + * @param fireOnlyUpdatedPanes Do not fire trigger if the retraction value is the same as the new value. */ - public abstract void fireRetractionTrigger(Window window); + public abstract void fireRetractionTrigger(Window window, boolean fireOnlyUpdatedPanes); @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java index a38207a..6fab7de 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java @@ -79,7 +79,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> if (triggerOption != null && triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { // fire a retraction trigger because the session window will be enlarged - fireRetractionTrigger(sessionWindow); + fireRetractionTrigger(sessionWindow, false); } // create a new session window that covers the timestamp long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp); @@ -105,8 +105,8 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> if (triggerOption != null && triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { // fire a retraction trigger because the two session windows will be merged to a new window - fireRetractionTrigger(sessionWindow1); - fireRetractionTrigger(sessionWindow2); + fireRetractionTrigger(sessionWindow1, false); + fireRetractionTrigger(sessionWindow2, false); } long newBeginTimestamp = Math.min(sessionWindow1.getBeginTimestamp(), sessionWindow2.getBeginTimestamp()); long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + sessionWindow1.getDurationMillis(), @@ -149,7 +149,7 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> { for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) { OutputValT outputVal = accumulation.getOutput(entry.getValue()); - if (fireOnlyUpdatedPanes) { + if (fireOnlyUpdatedPanes && retractionStorage != null) { OutputValT oldValue = retractionStorage.get(window, entry.getKey()); if (oldValue != null && oldValue.equals(outputVal)) { continue; @@ -163,12 +163,21 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> } @Override - public void fireRetractionTrigger(Window window) + public void fireRetractionTrigger(Window window, boolean fireOnlyUpdatedPanes) { if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { throw new UnsupportedOperationException(); } for (Map.Entry<KeyT, OutputValT> entry : retractionStorage.entries(window)) { + if (fireOnlyUpdatedPanes) { + AccumT currentAccum = dataStorage.get(window, entry.getKey()); + if (currentAccum != null) { + OutputValT currentValue = accumulation.getOutput(currentAccum); + if (currentValue != null && currentValue.equals(entry.getValue())) { + continue; + } + } + } output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue())))); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java index 7275d88..26e011a 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java @@ -56,7 +56,7 @@ public class WindowedOperatorImpl<InputT, AccumT, OutputT> { AccumT accumulatedValue = dataStorage.get(window); OutputT outputValue = accumulation.getOutput(accumulatedValue); - if (fireOnlyUpdatedPanes) { + if (fireOnlyUpdatedPanes && retractionStorage != null) { OutputT oldValue = retractionStorage.get(window); if (oldValue != null && oldValue.equals(outputValue)) { return; @@ -69,13 +69,19 @@ public class WindowedOperatorImpl<InputT, AccumT, OutputT> } @Override - public void fireRetractionTrigger(Window window) + public void fireRetractionTrigger(Window window, boolean fireOnlyUpdatedPanes) { if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { throw new UnsupportedOperationException(); } OutputT oldValue = retractionStorage.get(window); if (oldValue != null) { + if (fireOnlyUpdatedPanes) { + AccumT accumulatedValue = dataStorage.get(window); + if (accumulatedValue != null && oldValue.equals(accumulation.getOutput(accumulatedValue))) { + return; + } + } output.emit(new Tuple.WindowedTuple<>(window, accumulation.getRetraction(oldValue))); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9f04db7f/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java index bc5d80f..15aba82 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java @@ -298,20 +298,36 @@ public class WindowedOperatorTest @Test public void testTriggerWithAccumulatingModeFiringAllPanes() { - testTriggerWithAccumulatingModeHelper(false); + testTrigger2(false, false); + } + + @Test + public void testTriggerWithAccumulatingAndRetractingModeFiringAllPanes() + { + testTrigger2(false, true); } @Test public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes() { - testTriggerWithAccumulatingModeHelper(true); + testTrigger2(true, false); + } + + @Test + public void testTriggerWithAccumulatingAndRetractingModeFiringOnlyUpdatedPanes() + { + testTrigger2(true, true); } - public void testTriggerWithAccumulatingModeHelper(boolean firingOnlyUpdatedPanes) + private void testTrigger2(boolean firingOnlyUpdatedPanes, boolean testRetraction) { WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); - TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000)) - .accumulatingFiredPanes(); + TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000)); + if (testRetraction) { + triggerOption.accumulatingAndRetractingFiredPanes(); + } else { + triggerOption.accumulatingFiredPanes(); + } if (firingOnlyUpdatedPanes) { triggerOption.firingOnlyUpdatedPanes(); } @@ -342,8 +358,14 @@ public class WindowedOperatorTest Assert.assertTrue("There should not be any trigger since no panes have been updated", sink.collectedTuples .isEmpty()); } else { - Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size()); - Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); + if (testRetraction) { + Assert.assertEquals("There should be exactly two tuples for the time trigger", 2, sink.collectedTuples.size()); + Assert.assertEquals(-5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); + Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(1)).getValue().longValue()); + } else { + Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size()); + Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); + } } windowedOperator.teardown(); }
