Repository: apex-malhar Updated Branches: refs/heads/master 0885bfad2 -> 4cbbb7507
APEXMALHAR-2359 #resolve #comment Optimise fire trigger to avoid go through all data Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/875e47c7 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/875e47c7 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/875e47c7 Branch: refs/heads/master Commit: 875e47c7a74b7740c28b8fddd12bc990e9f8ae0a Parents: ca6995c Author: brightchen <[email protected]> Authored: Tue Nov 29 15:05:09 2016 -0800 Committer: brightchen <[email protected]> Committed: Tue Jan 17 20:55:43 2017 -0800 ---------------------------------------------------------------------- .../AbstractWindowedOperatorBenchmarkApp.java | 5 ++ .../KeyedWindowedOperatorBenchmarkApp.java | 24 ++++++ .../window/impl/AbstractWindowedOperator.java | 11 ++- .../window/impl/KeyedWindowedOperatorImpl.java | 77 +++++++++++++++++--- 4 files changed, 104 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java index 09f7653..64af9f9 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java @@ -106,6 +106,7 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O windowedOperator.setDataStorage(createDataStorage(sccImpl)); windowedOperator.setRetractionStorage(createRetractionStorage(sccImpl)); windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage()); + setUpdatedKeyStorage(windowedOperator, conf, sccImpl); windowedOperator.setAccumulation(createAccumulation()); windowedOperator.setAllowedLateness(Duration.millis(ALLOWED_LATENESS)); @@ -121,6 +122,10 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O } } + protected void setUpdatedKeyStorage(O windowedOperator, Configuration conf, SpillableComplexComponentImpl sccImpl) + { + } + protected abstract WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl); protected abstract WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java index 7b2085d..5a9c955 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java @@ -22,17 +22,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableSetMultimapImpl; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.GenericSerde; import org.apache.apex.malhar.lib.window.Accumulation; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.Tuple.TimestampedTuple; +import org.apache.apex.malhar.lib.window.Window; import org.apache.apex.malhar.lib.window.WindowedStorage; import org.apache.apex.malhar.lib.window.accumulation.Count; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage; +import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; +import com.datatorrent.lib.fileaccess.TFileImpl; import com.datatorrent.lib.util.KeyValPair; public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator> @@ -51,6 +57,12 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL); } + @Override + protected void setUpdatedKeyStorage(MyKeyedWindowedOperator windowedOperator, Configuration conf, SpillableComplexComponentImpl sccImpl) + { + windowedOperator.setUpdatedKeyStorage(createUpdatedDataStorage(conf, sccImpl)); + } + protected static class MyKeyedWindowedOperator extends KeyedWindowedOperatorImpl { private static final Logger logger = LoggerFactory.getLogger(MyKeyedWindowedOperator.class); @@ -124,6 +136,18 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB return dataStorage; } + protected SpillableSetMultimapImpl<Window, String> createUpdatedDataStorage(Configuration conf, + SpillableComplexComponentImpl sccImpl) + { + String basePath = getStoreBasePath(conf); + ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore(); + ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath); + + SpillableSetMultimapImpl<Window, String> dataStorage = new SpillableSetMultimapImpl<Window, String>(store, + new byte[] {(byte)1}, 0, new GenericSerde<Window>(), new GenericSerde<String>()); + return dataStorage; + } + @Override protected WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java index e8ff622..4ba81b3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java @@ -83,9 +83,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext protected long currentWatermark = -1; private boolean triggerAtWatermark; protected long earlyTriggerCount; - private long earlyTriggerMillis; + protected long earlyTriggerMillis; protected long lateTriggerCount; - private long lateTriggerMillis; + protected long lateTriggerMillis; private long currentDerivedTimestamp = -1; private long timeIncrement; protected long fixedWatermarkMillis = -1; @@ -543,11 +543,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } } + protected boolean isFiringOnlyUpdatedPanes() + { + return triggerOption.isFiringOnlyUpdatedPanes(); + } + @Override public void fireTrigger(Window window, WindowState windowState) { if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { - fireRetractionTrigger(window, triggerOption.isFiringOnlyUpdatedPanes()); + fireRetractionTrigger(window, isFiringOnlyUpdatedPanes()); } fireNormalTrigger(window, triggerOption.isFiringOnlyUpdatedPanes()); windowState.lastTriggerFiredTime = currentDerivedTimestamp; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/875e47c7/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java index deb718b..2bfff03 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import org.apache.apex.malhar.lib.state.spillable.SpillableSetMultimapImpl; import org.apache.apex.malhar.lib.window.Accumulation; import org.apache.apex.malhar.lib.window.SessionWindowedStorage; import org.apache.apex.malhar.lib.window.TriggerOption; @@ -33,6 +34,7 @@ import org.apache.apex.malhar.lib.window.WindowState; import org.apache.apex.malhar.lib.window.WindowedStorage; import org.apache.hadoop.classification.InterfaceStability; +import com.datatorrent.api.Context; import com.datatorrent.lib.util.KeyValPair; /** @@ -50,6 +52,18 @@ import com.datatorrent.lib.util.KeyValPair; public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<? super InputValT, AccumT, OutputValT>> { + private SpillableSetMultimapImpl<Window, KeyT> updatedKeyStorage; + + @Override + public void setup(Context.OperatorContext context) + { + if (useUpdatedKeyStorage()) { + updatedKeyStorage.getStore().setup(context); + updatedKeyStorage.setup(context); + } + + super.setup(context); + } @Override protected <T> Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<T> inputTuple) @@ -135,6 +149,20 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> } @Override + public void endWindow() + { + super.endWindow(); + if (useUpdatedKeyStorage()) { + updatedKeyStorage.endWindow(); + } + } + + private boolean useUpdatedKeyStorage() + { + return updatedKeyStorage != null && isFiringOnlyUpdatedPanes(); + } + + @Override public void accumulateTuple(Tuple.WindowedTuple<KeyValPair<KeyT, InputValT>> tuple) { KeyValPair<KeyT, InputValT> kvData = tuple.getValue(); @@ -145,24 +173,43 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> if (accum == null) { accum = accumulation.defaultAccumulatedValue(); } - dataStorage.put(window, key, accumulation.accumulate(accum, kvData.getValue())); + + InputValT inputValue = kvData.getValue(); + AccumT newValue = accumulation.accumulate(accum, inputValue); + if ((earlyTriggerMillis > 0 || lateTriggerMillis > 0 || earlyTriggerCount > 0 || lateTriggerCount > 0) && useUpdatedKeyStorage()) { + updatedKeyStorage.put(window, key); + } + + dataStorage.put(window, key, newValue); } } @Override public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes) { - for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) { - OutputValT outputVal = accumulation.getOutput(entry.getValue()); - if (fireOnlyUpdatedPanes && retractionStorage != null) { - OutputValT oldValue = retractionStorage.get(window, entry.getKey()); - if (oldValue != null && oldValue.equals(outputVal)) { - continue; + if (useUpdatedKeyStorage()) { + for (KeyT key : updatedKeyStorage.get(window)) { + OutputValT outputVal = accumulation.getOutput(dataStorage.get(window, key)); + if (retractionStorage != null) { + OutputValT oldValue = retractionStorage.get(window, key); + if (oldValue != null && oldValue.equals(outputVal)) { + continue; + } + } + output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(key, outputVal))); + if (retractionStorage != null) { + retractionStorage.put(window, key, outputVal); } } - output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal))); - if (retractionStorage != null) { - retractionStorage.put(window, entry.getKey(), outputVal); + updatedKeyStorage.removeAll(window); + } else { + for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) { + OutputValT outputVal = accumulation.getOutput(entry.getValue()); + + output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal))); + if (retractionStorage != null) { + retractionStorage.put(window, entry.getKey(), outputVal); + } } } } @@ -187,4 +234,14 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> } } + public SpillableSetMultimapImpl<Window, KeyT> getUpdatedKeyStorage() + { + return updatedKeyStorage; + } + + public void setUpdatedKeyStorage(SpillableSetMultimapImpl<Window, KeyT> updatedKeyStorage) + { + this.updatedKeyStorage = updatedKeyStorage; + } + }
