APEXMALHAR-2345 purging time buckets in WindowedStorage
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9043f9d4 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9043f9d4 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9043f9d4 Branch: refs/heads/master Commit: 9043f9d4ea5514cbc7cdf7ad826eb19c73a9ef9e Parents: e22ea0d Author: David Yan <[email protected]> Authored: Thu Nov 17 12:58:34 2016 -0800 Committer: David Yan <[email protected]> Committed: Sun Jan 22 21:45:28 2017 -0800 ---------------------------------------------------------------------- .../AbstractWindowedOperatorBenchmarkApp.java | 2 ++ benchmark/src/test/resources/log4j.properties | 2 +- .../managed/IncrementalCheckpointManager.java | 11 ++++++---- .../managed/ManagedTimeUnifiedStateImpl.java | 2 +- .../malhar/lib/state/managed/StateTracker.java | 1 - .../spillable/SpillableComplexComponent.java | 2 ++ .../SpillableComplexComponentImpl.java | 5 +++++ .../lib/state/spillable/SpillableSetImpl.java | 2 +- .../apache/apex/malhar/lib/window/Window.java | 14 +++++-------- .../apex/malhar/lib/window/WindowedStorage.java | 5 +++++ .../window/impl/AbstractWindowedOperator.java | 16 ++++++++++++++- .../window/impl/InMemoryWindowedStorage.java | 13 ++++++++++++ .../impl/SpillableWindowedKeyedStorage.java | 21 ++++++++++++++++++++ .../impl/SpillableWindowedPlainStorage.java | 18 +++++++++++++++++ 14 files changed, 96 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java index 7250e74..4b9b423 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java @@ -25,6 +25,7 @@ import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.UnboundedTimeBucketAssigner; import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; import org.apache.apex.malhar.lib.state.spillable.managed.ManagedTimeUnifiedStateSpillableStateStore; @@ -141,6 +142,7 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O { String basePath = getStoreBasePath(conf); ManagedTimeUnifiedStateSpillableStateStore store = new ManagedTimeUnifiedStateSpillableStateStore(); + store.setTimeBucketAssigner(new UnboundedTimeBucketAssigner()); store.getTimeBucketAssigner().setBucketSpan(Duration.millis(10000)); ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/benchmark/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties index 92e48b7..2f3bc53 100644 --- a/benchmark/src/test/resources/log4j.properties +++ b/benchmark/src/test/resources/log4j.properties @@ -41,5 +41,5 @@ log4j.logger.org=info #log4j.logger.org.apache.commons.beanutils=warn log4j.logger.com.datatorrent=debug log4j.logger.org.apache.apex=debug -log4j.logger.org.apache.apex.malhar.lib.state.managed=info +log4j.logger.org.apache.apex.malhar.lib.state.managed=debug log4j.logger.com.datatorrent.common.util.FSStorageAgent=info http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java index 3b01ed2..65c1d1e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java @@ -72,6 +72,7 @@ public class IncrementalCheckpointManager extends FSWindowDataManager protected transient ManagedStateContext managedStateContext; private final transient AtomicLong latestExpiredTimeBucket = new AtomicLong(-1); + private long latestPurgedTimeBucket = -1; private transient int waitMillis; private volatile long lastTransferredWindow = Stateless.WINDOW_ID; @@ -109,8 +110,8 @@ public class IncrementalCheckpointManager extends FSWindowDataManager transferWindowFiles(); if (latestExpiredTimeBucket.get() > -1) { try { - managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo( - latestExpiredTimeBucket.getAndSet(-1)); + latestPurgedTimeBucket = latestExpiredTimeBucket.getAndSet(-1); + managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(latestPurgedTimeBucket); } catch (IOException e) { throwable.set(e); LOG.debug("delete files", e); @@ -133,8 +134,10 @@ public class IncrementalCheckpointManager extends FSWindowDataManager Map<Long, Map<Slice, Bucket.BucketedValue>> buckets = savedWindows.remove(windowId); for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> singleBucket : buckets.entrySet()) { - managedStateContext.getBucketsFileSystem().writeBucketData(windowId, singleBucket.getKey(), - singleBucket.getValue()); + long bucketId = singleBucket.getKey(); + if (bucketId > latestPurgedTimeBucket) { + managedStateContext.getBucketsFileSystem().writeBucketData(windowId, bucketId, singleBucket.getValue()); + } } committed(windowId); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java index d558eee..62ebbc5 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java @@ -142,7 +142,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem @Override public void setup(Context.OperatorContext context) { - // set UnBoundedTimeBucketAssigner to this managed state impl + // set UnboundedTimeBucketAssigner to this managed state impl if (timeBucketAssigner == null) { UnboundedTimeBucketAssigner unboundedTimeBucketAssigner = new UnboundedTimeBucketAssigner(); setTimeBucketAssigner(unboundedTimeBucketAssigner); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java index 7cab41c..e17f5b9 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java @@ -115,7 +115,6 @@ class StateTracker extends TimerTask long sizeFreed; try { sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow()); - LOG.debug("bucket freed {} {}", bucketId, sizeFreed); } catch (IOException e) { managedStateImpl.throwable.set(e); throw new RuntimeException("freeing " + bucketId, e); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java index b6ec6a2..c00589a 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java @@ -196,4 +196,6 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @return A {@link SpillableQueue}. */ <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde); + + SpillableStateStore getStore(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java index 1d9fbc6..1ecf86e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java @@ -261,4 +261,9 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent { store.committed(l); } + + public SpillableStateStore getStore() + { + return store; + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java index 221cd38..33e48ac 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java @@ -131,7 +131,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable public SpillableSetImpl(@NotNull byte[] prefix, @NotNull SpillableStateStore store, @NotNull Serde<T> serde, - @NotNull TimeExtractor timeExtractor) + @NotNull TimeExtractor<T> timeExtractor) { map = new SpillableMapImpl<>(Preconditions.checkNotNull(store), prefix, serde, new ListNodeSerde<>(serde), timeExtractor); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java index 1d7681d..edcfc2c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/Window.java @@ -146,16 +146,12 @@ public interface Window<WINDOW extends Comparable<WINDOW>> extends Comparable<WI @Override public int compareTo(TimeWindow o) { - if (this.getBeginTimestamp() < o.getBeginTimestamp()) { - return -1; - } else if (this.getBeginTimestamp() > o.getBeginTimestamp()) { - return 1; - } else if (this.getDurationMillis() < o.getDurationMillis()) { - return -1; - } else if (this.getDurationMillis() > o.getDurationMillis()) { - return 1; + long diff = (this.getBeginTimestamp() + this.getDurationMillis()) - (o.getBeginTimestamp() + o.getDurationMillis()); + if (diff == 0) { + return Long.signum(this.getBeginTimestamp() - o.getBeginTimestamp()); + } else { + return Long.signum(diff); } - return 0; } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java index e2874ba..55f8a02 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java @@ -56,6 +56,11 @@ public interface WindowedStorage extends Component<Context.OperatorContext> void remove(Window window); /** + * Purge checkpointed data for all the windows that lie totally beyond the given horizon + */ + void purge(long horizonMillis); + + /** * This interface handles plain value per window. If there is a key/value map for each window, use * {@link WindowedKeyedStorage}. Also note that a single T object is assumed to be fit in memory * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java index 4ba81b3..22e8525 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.TreeMap; import javax.validation.ValidationException; @@ -89,6 +90,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext private long currentDerivedTimestamp = -1; private long timeIncrement; protected long fixedWatermarkMillis = -1; + private transient long streamingWindowId; + private transient TreeMap<Long, Long> streamingWindowToLatenessHorizon = new TreeMap<>(); private Map<String, Component<Context.OperatorContext>> components = new HashMap<>(); @@ -407,7 +410,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext public void dropTuple(Tuple input) { // do nothing - LOG.debug("Dropping late tuple {}", input); } @Override @@ -465,6 +467,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } else { currentDerivedTimestamp += timeIncrement; } + streamingWindowId = windowId; } /** @@ -517,6 +520,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext } } } + streamingWindowToLatenessHorizon.put(streamingWindowId, horizon); controlOutput.emit(new WatermarkImpl(nextWatermark)); this.currentWatermark = nextWatermark; } @@ -623,5 +627,15 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext ((CheckpointNotificationListener)component).committed(windowId); } } + Long floorWindowId = streamingWindowToLatenessHorizon.floorKey(windowId); + if (floorWindowId != null) { + long horizon = streamingWindowToLatenessHorizon.get(floorWindowId); + windowStateMap.purge(horizon); + dataStorage.purge(horizon); + if (retractionStorage != null) { + retractionStorage.purge(horizon); + } + streamingWindowToLatenessHorizon.headMap(windowId, true).clear(); + } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java index db18a40..730bcf9 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/InMemoryWindowedStorage.java @@ -18,6 +18,7 @@ */ package org.apache.apex.malhar.lib.window.impl; +import java.util.Iterator; import java.util.Map; import java.util.TreeMap; @@ -86,4 +87,16 @@ public class InMemoryWindowedStorage<T> implements WindowedStorage.WindowedPlain { } + @Override + public void purge(long horizonMillis) + { + for (Iterator<Map.Entry<Window, T>> iterator = map.entrySet().iterator(); iterator.hasNext(); ) { + Window window = iterator.next().getKey(); + if (window.getBeginTimestamp() + window.getDurationMillis() < horizonMillis) { + iterator.remove(); + } else { + break; + } + } + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java index bf0c804..6138f97 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java @@ -25,8 +25,13 @@ import java.util.Set; import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; import org.apache.apex.malhar.lib.state.spillable.Spillable; import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; import org.apache.apex.malhar.lib.utils.serde.GenericSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.window.Window; @@ -57,6 +62,9 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind protected Spillable.SpillableMap<Pair<Window, K>, V> windowKeyToValueMap; protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap; + private static final Logger LOG = LoggerFactory.getLogger(SpillableWindowedKeyedStorage.class); + + private class KVIterator implements Iterator<Map.Entry<K, V>> { final Window window; @@ -221,4 +229,17 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind { return windowKeyToValueMap.get(new ImmutablePair<>(window, key)); } + + @Override + public void purge(long horizonMillis) + { + SpillableStateStore store = scc.getStore(); + if (store instanceof ManagedTimeUnifiedStateImpl) { + ManagedTimeUnifiedStateImpl timeState = (ManagedTimeUnifiedStateImpl)store; + long purgeTimeBucket = horizonMillis - timeState.getTimeBucketAssigner().getBucketSpan().getMillis(); + LOG.debug("Purging state less than equal to {}", purgeTimeBucket); + timeState.purgeTimeBucketsLessThanEqualTo(purgeTimeBucket); + } + } + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9043f9d4/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java index f70771c..a58d89f 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java @@ -22,8 +22,13 @@ import java.util.Map; import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; import org.apache.apex.malhar.lib.state.spillable.Spillable; import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; import org.apache.apex.malhar.lib.utils.serde.GenericSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.window.Window; @@ -45,6 +50,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe private long bucket; private Serde<Window> windowSerde; private Serde<T> valueSerde; + private static final Logger LOG = LoggerFactory.getLogger(SpillableWindowedPlainStorage.class); protected Spillable.SpillableMap<Window, T> windowToDataMap; @@ -143,4 +149,16 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe public void teardown() { } + + @Override + public void purge(long horizonMillis) + { + SpillableStateStore store = scc.getStore(); + if (store instanceof ManagedTimeUnifiedStateImpl) { + ManagedTimeUnifiedStateImpl timeState = (ManagedTimeUnifiedStateImpl)store; + long purgeTimeBucket = horizonMillis - timeState.getTimeBucketAssigner().getBucketSpan().getMillis(); + LOG.debug("Purging state less than equal to {}", purgeTimeBucket); + timeState.purgeTimeBucketsLessThanEqualTo(purgeTimeBucket); + } + } }
