Repository: apex-malhar Updated Branches: refs/heads/master 52510b0f8 -> 4ab457f18
APEXMALHAR-2301 Refactor timebucketassigner to add a single timebucket assigner and change the timebucket metadata from array to map to handle unbounded time buckets This closes #503 Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4ab457f1 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4ab457f1 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4ab457f1 Branch: refs/heads/master Commit: 4ab457f18a8af8892d8f3f4a5f6e6c2eb50995ac Parents: 52510b0 Author: Siyuan Hua <[email protected]> Authored: Sun Nov 20 22:01:59 2016 -0800 Committer: David Yan <[email protected]> Committed: Sun Jan 15 20:44:06 2017 -0800 ---------------------------------------------------------------------- .../apex/malhar/lib/dedup/AbstractDeduper.java | 4 +- .../malhar/lib/dedup/BoundedDedupOperator.java | 4 +- .../lib/dedup/TimeBasedDedupOperator.java | 6 +- .../AbstractManagedStateInnerJoinOperator.java | 8 +- .../state/managed/AbstractManagedStateImpl.java | 64 ++++--- .../lib/state/managed/ManagedStateImpl.java | 4 +- .../lib/state/managed/ManagedTimeStateImpl.java | 15 +- .../managed/ManagedTimeStateMultiValue.java | 8 +- .../managed/ManagedTimeUnifiedStateImpl.java | 38 ++-- .../MovingBoundaryTimeBucketAssigner.java | 180 +++++++++++++++++++ .../malhar/lib/state/managed/StateTracker.java | 2 +- .../lib/state/managed/TimeBucketAssigner.java | 167 ++++------------- .../malhar/lib/state/managed/TimeExtractor.java | 23 +++ .../managed/UnboundedTimeBucketAssigner.java | 70 ++++++++ .../spillable/SpillableSetMultimapImpl.java | 27 +-- .../impl/SpillableSessionWindowedStorage.java | 6 +- .../ManagedTimeUnifiedStateImplTest.java | 12 +- .../state/managed/MockManagedStateContext.java | 6 +- .../MovingBoundaryTimeBucketAssignerTest.java | 116 ++++++++++++ .../state/managed/TimeBucketAssignerTest.java | 116 ------------ .../malhar/lib/window/WindowedOperatorTest.java | 3 +- 21 files changed, 526 insertions(+), 353 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java index c73cea7..bf81fde 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java @@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.state.BucketedState; import org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl; import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; -import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; @@ -54,7 +54,7 @@ import com.datatorrent.netlet.util.Slice; /** * Abstract class which allows de-duplicating incoming tuples based on a configured key. - * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * Also supports expiry mechanism based on a configurable expiry period configured using {@link MovingBoundaryTimeBucketAssigner} * in {@link ManagedTimeUnifiedStateImpl} * Following steps are used in identifying the state of a particular tuple: * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java index 7763103..e2a1297 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java @@ -24,7 +24,7 @@ import java.util.concurrent.Future; import javax.validation.constraints.NotNull; import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; -import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner; import org.apache.hadoop.classification.InterfaceStability.Evolving; @@ -110,7 +110,7 @@ public class BoundedDedupOperator extends AbstractDeduper<Object> numBuckets = DEFAULT_NUM_BUCKETS; } ((ManagedTimeStateImpl)managedState).setNumBuckets(numBuckets); - TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner(); + MovingBoundaryTimeBucketAssigner timeBucketAssigner = new MovingBoundaryTimeBucketAssigner(); managedState.setTimeBucketAssigner(timeBucketAssigner); super.setup(context); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java index 3f888cc..3b5f5e2 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java @@ -25,7 +25,7 @@ import javax.validation.constraints.NotNull; import org.joda.time.Duration; import org.joda.time.Instant; import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; -import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner; import org.apache.hadoop.classification.InterfaceStability.Evolving; import com.datatorrent.api.Context; @@ -64,7 +64,7 @@ import com.datatorrent.netlet.util.Slice; * 3. {@link #referenceInstant} - The reference point from which to start the time which is used for expiry. * Setting the {@link #referenceInstant} to say, r seconds from the epoch, would initialize the start of expiry * to be from that instant = r. The start and end of the expiry window periodically move by the span of a single - * bucket. Refer {@link TimeBucketAssigner} for details. + * bucket. Refer {@link MovingBoundaryTimeBucketAssigner} for details. * * Additionally, it also needs the following parameters: * 1. {@link #keyExpression} - The java expression to extract the key fields in the incoming tuple (POJO) @@ -151,7 +151,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A @Override public void setup(OperatorContext context) { - TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner(); + MovingBoundaryTimeBucketAssigner timeBucketAssigner = new MovingBoundaryTimeBucketAssigner(); timeBucketAssigner.setBucketSpan(Duration.standardSeconds(bucketSpan)); timeBucketAssigner.setExpireBefore(Duration.standardSeconds(expireBefore)); timeBucketAssigner.setReferenceInstant(new Instant(referenceInstant * 1000)); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java index c82c3e3..453da80 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java @@ -27,6 +27,7 @@ import java.util.concurrent.Future; import org.joda.time.Duration; import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner; import org.apache.apex.malhar.lib.state.spillable.Spillable; import org.apache.hadoop.fs.Path; import com.google.common.collect.Maps; @@ -68,12 +69,13 @@ public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends Abstrac stream2Store = new ManagedTimeStateImpl(); stream1Store.setNumBuckets(noOfBuckets); stream2Store.setNumBuckets(noOfBuckets); + assert stream1Store.getTimeBucketAssigner() == stream2Store.getTimeBucketAssigner(); if (bucketSpanTime != null) { stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); - stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); } - stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); - stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + if (stream1Store.getTimeBucketAssigner() instanceof MovingBoundaryTimeBucketAssigner) { + ((MovingBoundaryTimeBucketAssigner)stream1Store.getTimeBucketAssigner()).setExpireBefore(Duration.millis(getExpiryTime())); + } stream1Data = new ManagedTimeStateMultiValue(stream1Store, !isLeftKeyPrimary()); stream2Data = new ManagedTimeStateMultiValue(stream2Store, !isRightKeyPrimary()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java index 364bc19..daae2d8 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java @@ -20,6 +20,7 @@ package org.apache.apex.malhar.lib.state.managed; import java.io.IOException; import java.util.Comparator; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -128,14 +129,14 @@ public abstract class AbstractManagedStateImpl { private long maxMemorySize; - protected int numBuckets; + protected long numBuckets; @NotNull private FileAccess fileAccess = new TFileImpl.DTFileImpl(); @NotNull - protected TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner(); + protected TimeBucketAssigner timeBucketAssigner; - protected Bucket[] buckets; + protected Map<Long, Bucket> buckets; @Min(1) private int numReaders = 1; @@ -176,6 +177,11 @@ public abstract class AbstractManagedStateImpl operatorContext = context; fileAccess.init(); + if (timeBucketAssigner == null) { + // set default time bucket assigner + MovingBoundaryTimeBucketAssigner movingBoundaryTimeBucketAssigner = new MovingBoundaryTimeBucketAssigner(); + setTimeBucketAssigner(movingBoundaryTimeBucketAssigner); + } timeBucketAssigner.setPurgeListener(this); //setup all the managed state components @@ -184,11 +190,11 @@ public abstract class AbstractManagedStateImpl bucketsFileSystem.setup(this); if (buckets == null) { - //create buckets array only once at start when it is not created. + //create buckets map only once at start if it is not created. numBuckets = getNumBuckets(); - buckets = new Bucket[numBuckets]; + buckets = new HashMap<>(); } - for (Bucket bucket : buckets) { + for (Bucket bucket : buckets.values()) { if (bucket != null) { bucket.setup(this); } @@ -210,8 +216,8 @@ public abstract class AbstractManagedStateImpl stateEntry.getValue(); if (state != null && !state.isEmpty()) { for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> bucketEntry : state.entrySet()) { - int bucketIdx = prepareBucket(bucketEntry.getKey()); - buckets[bucketIdx].recoveredData(stateEntry.getKey(), bucketEntry.getValue()); + long bucketIdx = prepareBucket(bucketEntry.getKey()); + buckets.get(bucketIdx).recoveredData(stateEntry.getKey(), bucketEntry.getValue()); } } checkpointManager.save(state, stateEntry.getKey(), true /*skipWritingToWindowFile*/); @@ -231,7 +237,7 @@ public abstract class AbstractManagedStateImpl * * @return number of buckets. */ - public abstract int getNumBuckets(); + public abstract long getNumBuckets(); public void beginWindow(long windowId) { @@ -246,17 +252,17 @@ public abstract class AbstractManagedStateImpl * @param bucketId bucket key * @return bucket index */ - protected int prepareBucket(long bucketId) + protected long prepareBucket(long bucketId) { stateTracker.bucketAccessed(bucketId); - int bucketIdx = getBucketIdx(bucketId); + long bucketIdx = getBucketIdx(bucketId); - Bucket bucket = buckets[bucketIdx]; + Bucket bucket = buckets.get(bucketIdx); if (bucket == null) { //bucket is not in memory bucket = newBucket(bucketId); bucket.setup(this); - buckets[bucketIdx] = bucket; + buckets.put(bucketIdx, bucket); } else if (bucket.getBucketId() != bucketId) { handleBucketConflict(bucketIdx, bucketId); } @@ -269,13 +275,13 @@ public abstract class AbstractManagedStateImpl Preconditions.checkNotNull(value, "value"); if (timeBucket != -1) { //time bucket is invalid data is not stored - int bucketIdx = prepareBucket(bucketId); + long bucketIdx = prepareBucket(bucketId); //synchronization on a bucket isn't required for put because the event is added to flash which is //a concurrent map. The assumption here is that the calls to put & get(sync/async) are being made synchronously by //a single thread (operator thread). The get(sync/async) always checks memory first synchronously. //If the key is not in the memory, then the async get will uses other reader threads which will fetch it from //the files. - buckets[bucketIdx].put(key, timeBucket, value); + buckets.get(bucketIdx).put(key, timeBucket, value); } } @@ -283,8 +289,8 @@ public abstract class AbstractManagedStateImpl protected Slice getValueFromBucketSync(long bucketId, long timeBucket, @NotNull Slice key) { Preconditions.checkNotNull(key, "key"); - int bucketIdx = prepareBucket(bucketId); - Bucket bucket = buckets[bucketIdx]; + long bucketIdx = prepareBucket(bucketId); + Bucket bucket = buckets.get(bucketIdx); synchronized (bucket) { return bucket.get(key, timeBucket, Bucket.ReadSource.ALL); } @@ -294,8 +300,8 @@ public abstract class AbstractManagedStateImpl protected Future<Slice> getValueFromBucketAsync(long bucketId, long timeBucket, @NotNull Slice key) { Preconditions.checkNotNull(key, "key"); - int bucketIdx = prepareBucket(bucketId); - Bucket bucket = buckets[bucketIdx]; + long bucketIdx = prepareBucket(bucketId); + Bucket bucket = buckets.get(bucketIdx); synchronized (bucket) { Slice cachedVal = bucket.get(key, timeBucket, Bucket.ReadSource.MEMORY); if (cachedVal != null) { @@ -307,20 +313,20 @@ public abstract class AbstractManagedStateImpl } } - protected void handleBucketConflict(int bucketIdx, long newBucketId) + protected void handleBucketConflict(long bucketIdx, long newBucketId) { - throw new IllegalArgumentException("bucket conflict " + buckets[bucketIdx].getBucketId() + " " + newBucketId); + throw new IllegalArgumentException("bucket conflict " + buckets.get(bucketIdx).getBucketId() + " " + newBucketId); } - protected int getBucketIdx(long bucketId) + protected long getBucketIdx(long bucketId) { - return (int)Math.abs(bucketId % numBuckets); + return Math.abs(bucketId % numBuckets); } @Override public Bucket getBucket(long bucketId) { - return buckets[getBucketIdx(bucketId)]; + return buckets.get(getBucketIdx(bucketId)); } @Override @@ -330,7 +336,7 @@ public abstract class AbstractManagedStateImpl if (b == null) { b = newBucket(bucketId); b.setup(this); - buckets[getBucketIdx(bucketId)] = b; + buckets.put(getBucketIdx(bucketId), b); } return b; } @@ -351,7 +357,7 @@ public abstract class AbstractManagedStateImpl { Map<Long, Map<Slice, Bucket.BucketedValue>> flashData = Maps.newHashMap(); - for (Bucket bucket : buckets) { + for (Bucket bucket : buckets.values()) { if (bucket != null) { synchronized (bucket) { Map<Slice, Bucket.BucketedValue> flashDataForBucket = bucket.checkpoint(windowId); @@ -381,7 +387,7 @@ public abstract class AbstractManagedStateImpl { synchronized (commitLock) { try { - for (Bucket bucket : buckets) { + for (Bucket bucket : buckets.values()) { if (bucket != null) { synchronized (bucket) { bucket.committed(windowId); @@ -402,7 +408,7 @@ public abstract class AbstractManagedStateImpl public Map<Long, Long> getBucketMemoryUsage() { Map<Long, Long> bucketToSize = Maps.newHashMap(); - for (Bucket bucket : buckets) { + for (Bucket bucket : buckets.values()) { if (bucket == null) { continue; } @@ -419,7 +425,7 @@ public abstract class AbstractManagedStateImpl bucketsFileSystem.teardown(); timeBucketAssigner.teardown(); readerService.shutdownNow(); - for (Bucket bucket : buckets) { + for (Bucket bucket : buckets.values()) { if (bucket != null) { synchronized (bucket) { bucket.teardown(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java index ba8cdc6..bf2209f 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java @@ -55,7 +55,7 @@ public class ManagedStateImpl extends AbstractManagedStateImpl implements Bucket @Override public void put(long bucketId, @NotNull Slice key, @NotNull Slice value) { - long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); + long timeBucket = timeBucketAssigner.getTimeBucket(time); putInBucket(bucketId, timeBucket, key, value); } @@ -88,7 +88,7 @@ public class ManagedStateImpl extends AbstractManagedStateImpl implements Bucket @Min(1) @Override - public int getNumBuckets() + public long getNumBuckets() { return numBuckets; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java index eddc736..caa5e2b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java @@ -28,6 +28,7 @@ import org.apache.apex.malhar.lib.state.TimeSlicedBucketedState; import com.google.common.util.concurrent.Futures; +import com.datatorrent.api.Context; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.netlet.util.Slice; @@ -48,7 +49,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti @Override public void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value) { - long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); + long timeBucket = timeBucketAssigner.getTimeBucket(time); putInBucket(bucketId, timeBucket, key, value); } @@ -61,7 +62,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti @Override public Slice getSync(long bucketId, long time, @NotNull Slice key) { - long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); + long timeBucket = timeBucketAssigner.getTimeBucket(time); if (timeBucket == -1) { //time is expired so no point in looking further. return BucketedState.EXPIRED; @@ -78,7 +79,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti @Override public Future<Slice> getAsync(long bucketId, long time, Slice key) { - long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); + long timeBucket = timeBucketAssigner.getTimeBucket(time); if (timeBucket == -1) { //time is expired so no point in looking further. return Futures.immediateFuture(BucketedState.EXPIRED); @@ -88,11 +89,17 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti @Min(1) @Override - public int getNumBuckets() + public long getNumBuckets() { return numBuckets; } + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + } + /** * Sets the number of buckets. * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java index bd3319f..19a6abc 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java @@ -182,7 +182,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM { if (isKeyContainsMultiValue) { Slice keySlice = streamCodec.toByteArray(k); - int bucketId = getBucketId(k); + long bucketId = getBucketId(k); Slice valueSlice = store.getSync(bucketId, keySlice); List<V> listOb; if (valueSlice == null || valueSlice.length == 0) { @@ -207,7 +207,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM { if (isKeyContainsMultiValue) { Slice keySlice = streamCodec.toByteArray(k); - int bucketId = getBucketId(k); + long bucketId = getBucketId(k); Slice valueSlice = store.getSync(bucketId, keySlice); List<V> listOb; if (valueSlice == null || valueSlice.length == 0) { @@ -232,7 +232,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM */ private boolean insertInStore(long bucketId, long timeBucket, Slice keySlice, Slice valueSlice) { - long timeBucketId = store.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(timeBucket); + long timeBucketId = store.getTimeBucketAssigner().getTimeBucket(timeBucket); if (timeBucketId != -1) { store.putInBucket(bucketId, timeBucketId, keySlice, valueSlice); return true; @@ -270,7 +270,7 @@ public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListM throw new UnsupportedOperationException(); } - public int getBucketId(K k) + public long getBucketId(K k) { return k.hashCode() % store.getNumBuckets(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java index e04286e..d558eee 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java @@ -40,6 +40,7 @@ import com.google.common.collect.Queues; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; +import com.datatorrent.api.Context; import com.datatorrent.lib.fileaccess.FileAccess; import com.datatorrent.netlet.util.Slice; @@ -59,7 +60,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem } @Override - public int getNumBuckets() + public long getNumBuckets() { return timeBucketAssigner.getNumBuckets(); } @@ -67,14 +68,14 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem @Override public void put(long time, @NotNull Slice key, @NotNull Slice value) { - long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); + long timeBucket = timeBucketAssigner.getTimeBucket(time); putInBucket(timeBucket, timeBucket, key, value); } @Override public Slice getSync(long time, @NotNull Slice key) { - long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); + long timeBucket = timeBucketAssigner.getTimeBucket(time); if (timeBucket == -1) { //time is expired so return expired slice. return BucketedState.EXPIRED; @@ -85,7 +86,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem @Override public Future<Slice> getAsync(long time, @NotNull Slice key) { - long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); + long timeBucket = timeBucketAssigner.getTimeBucket(time); if (timeBucket == -1) { //time is expired so return expired slice. return Futures.immediateFuture(BucketedState.EXPIRED); @@ -101,10 +102,10 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem //collect all the purged time buckets while (null != (purgedTimeBucket = purgedTimeBuckets.poll())) { - int purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket); - if (buckets[purgedTimeBucketIdx] != null && buckets[purgedTimeBucketIdx].getBucketId() == purgedTimeBucket) { - bucketsForTeardown.add(buckets[purgedTimeBucketIdx]); - buckets[purgedTimeBucketIdx] = null; + long purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket); + if (buckets.containsKey(purgedTimeBucketIdx) && buckets.get(purgedTimeBucketIdx).getBucketId() == purgedTimeBucket) { + bucketsForTeardown.add(buckets.get(purgedTimeBucketIdx)); + buckets.remove(purgedTimeBucketIdx); } } @@ -121,14 +122,14 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem } @Override - protected void handleBucketConflict(int bucketIdx, long newBucketId) + protected void handleBucketConflict(long bucketId, long newBucketId) { - Preconditions.checkArgument(buckets[bucketIdx].getBucketId() < newBucketId, "new time bucket should have a value" + Preconditions.checkArgument(buckets.get(bucketId).getBucketId() < newBucketId, "new time bucket should have a value" + " greater than the old time bucket"); //Time buckets are purged periodically so here a bucket conflict is expected and so we just ignore conflicts. - bucketsForTeardown.add(buckets[bucketIdx]); - buckets[bucketIdx] = newBucket(newBucketId); - buckets[bucketIdx].setup(this); + bucketsForTeardown.add(buckets.get(bucketId)); + buckets.put(bucketId, newBucket(newBucketId)); + buckets.get(bucketId).setup(this); } @Override @@ -138,6 +139,17 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem super.purgeTimeBucketsLessThanEqualTo(timeBucket); } + @Override + public void setup(Context.OperatorContext context) + { + // set UnBoundedTimeBucketAssigner to this managed state impl + if (timeBucketAssigner == null) { + UnboundedTimeBucketAssigner unboundedTimeBucketAssigner = new UnboundedTimeBucketAssigner(); + setTimeBucketAssigner(unboundedTimeBucketAssigner); + } + super.setup(context); + } + /** * This uses operator id instead of bucket id as the name of parent folder of time-buckets. This is because * multiple partitions may work on same time-buckets. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java new file mode 100644 index 0000000..ece7686 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import javax.validation.constraints.NotNull; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; + +/** + * Keeps track of time buckets and triggers purging of obsolete time-buckets that moved out of boundary.<br/> + * + * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets, + * which time-bucket an event falls into and sliding the time boundaries. + * <p/> + * + * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system + * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/> + * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and + * <code>rererenceInstant = currentTime</code>, then <code> + * numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/> + * + * These properties once configured shouldn't be changed because that will result in different time-buckets + * for the same (key,time) pair after a failure. + * <p/> + * + * The time boundaries- start and end, move by multiples of time-bucket span. Any event with time < start + * is considered expired. The boundaries slide by {@link #getTimeBucket(long)}. The time which is passed as an + * argument to this method can be ahead of <code>end</code>. This means that the corresponding event is a future event + * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further. + * + */ +public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner +{ + private long start; + + private long end; + + @NotNull + private Instant referenceInstant = new Instant(); + + @NotNull + @FieldSerializer.Bind(JavaSerializer.class) + private Duration expireBefore = Duration.standardDays(2); + + private long bucketSpanMillis; + + private int numBuckets; + private transient long fixedStart; + private transient boolean triggerPurge; + private transient long lowestPurgeableTimeBucket; + + + @Override + public void setup(@NotNull ManagedStateContext managedStateContext) + { + super.setup(managedStateContext); + fixedStart = referenceInstant.getMillis() - expireBefore.getMillis(); + + if (!isInitialized()) { + + start = fixedStart; + bucketSpanMillis = getBucketSpan().getMillis(); + numBuckets = (int)((expireBefore.getMillis() + bucketSpanMillis - 1) / bucketSpanMillis); + end = start + (numBuckets * bucketSpanMillis); + + setInitialized(true); + } + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + if (triggerPurge && getPurgeListener() != null) { + triggerPurge = false; + getPurgeListener().purgeTimeBucketsLessThanEqualTo(lowestPurgeableTimeBucket); + } + } + + @Override + public void teardown() + { + } + + /** + * Get the bucket key for the long value and adjust boundaries if necessary. + * + * @param time value from which bucket key is derived. + * @return -1 if value is already expired; bucket key otherwise. + */ + @Override + public long getTimeBucket(long time) + { + if (time < start) { + return -1; + } + long diffFromStart = time - fixedStart; + long key = diffFromStart / bucketSpanMillis; + if (time >= end) { + long diffInBuckets = (time - end) / bucketSpanMillis; + long move = (diffInBuckets + 1) * bucketSpanMillis; + start += move; + end += move; + triggerPurge = true; + lowestPurgeableTimeBucket += diffInBuckets; + } + return key; + + } + + /** + * @return number of buckets. + */ + @Override + public long getNumBuckets() + { + return numBuckets; + } + + /** + * @return reference instant + */ + public Instant getReferenceInstant() + { + return referenceInstant; + } + + /** + * Sets the reference instant (by default the system time when the streaming app is created). + * This instant with {@link #expireBefore} is used to calculate the {@link #start} and {@link #end}. + * + * @param referenceInstant + */ + public void setReferenceInstant(Instant referenceInstant) + { + this.referenceInstant = referenceInstant; + } + + /** + * @return duration before which the data is expired. + */ + public Duration getExpireBefore() + { + return expireBefore; + } + + /** + * Sets the duration which denotes expiry. Any event with time before this duration is considered to be expired. + * @param expireBefore duration + */ + public void setExpireBefore(Duration expireBefore) + { + this.expireBefore = expireBefore; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java index 225439f..7cab41c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java @@ -87,7 +87,7 @@ class StateTracker extends TimerTask //freeing of state needs to be stopped during commit as commit results in transferring data to a state which // can be freed up as well. long bytesSum = 0; - for (Bucket bucket : managedStateImpl.buckets) { + for (Bucket bucket : managedStateImpl.buckets.values()) { if (bucket != null) { bytesSum += bucket.getSizeInBytes(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java index d218b37..2cae914 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java @@ -21,7 +21,8 @@ package org.apache.apex.malhar.lib.state.managed; import javax.validation.constraints.NotNull; import org.joda.time.Duration; -import org.joda.time.Instant; + +import org.apache.apex.malhar.lib.state.spillable.WindowListener; import com.esotericsoftware.kryo.serializers.FieldSerializer; import com.esotericsoftware.kryo.serializers.JavaSerializer; @@ -30,180 +31,79 @@ import com.google.common.base.Preconditions; import com.datatorrent.api.Context; /** - * Keeps track of time buckets and triggers purging of obsolete time-buckets.<br/> - * - * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets, - * which time-bucket an event falls into and sliding the time boundaries. - * <p/> - * - * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system - * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/> - * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and - * <code>rererenceInstant = currentTime</code>, then <code> - * numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/> - * - * These properties once configured shouldn't be changed because that will result in different time-buckets - * for the same (key,time) pair after a failure. - * <p/> - * - * The time boundaries- start and end, move by multiples of time-bucket span. Any event with time < start - * is considered expired. The boundaries slide by {@link #getTimeBucketAndAdjustBoundaries(long)}. The time which is passed as an - * argument to this method can be ahead of <code>end</code>. This means that the corresponding event is a future event - * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further. - * - * @since 3.4.0 + * Abstract class to extract a bucket for a given time */ -public class TimeBucketAssigner implements ManagedStateComponent +public abstract class TimeBucketAssigner implements ManagedStateComponent, WindowListener { - @NotNull - private Instant referenceInstant = new Instant(); + private transient PurgeListener purgeListener; - @NotNull - @FieldSerializer.Bind(JavaSerializer.class) - private Duration expireBefore = Duration.standardDays(2); + private boolean initialized; @FieldSerializer.Bind(JavaSerializer.class) private Duration bucketSpan; - private long bucketSpanMillis; - - private long start; - private long end; - private int numBuckets; - private transient long fixedStart; - private transient boolean triggerPurge; - private transient long lowestPurgeableTimeBucket; - - private boolean initialized; - - private transient PurgeListener purgeListener; - @Override public void setup(@NotNull ManagedStateContext managedStateContext) { Context.OperatorContext context = managedStateContext.getOperatorContext(); - fixedStart = referenceInstant.getMillis() - expireBefore.getMillis(); - - if (!initialized) { - if (bucketSpan == null) { - bucketSpan = Duration.millis(context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * - context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)); - } - start = fixedStart; - bucketSpanMillis = bucketSpan.getMillis(); - numBuckets = (int)((expireBefore.getMillis() + bucketSpanMillis - 1) / bucketSpanMillis); - end = start + (numBuckets * bucketSpanMillis); - - initialized = true; - } - } - - public void endWindow() - { - if (triggerPurge && purgeListener != null) { - triggerPurge = false; - purgeListener.purgeTimeBucketsLessThanEqualTo(lowestPurgeableTimeBucket); - } - } - - @Override - public void teardown() - { - } - - /** - * Get the bucket key for the long value and adjust boundaries if necessary. - * - * @param value value from which bucket key is derived. - * @return -1 if value is already expired; bucket key otherwise. - */ - public long getTimeBucketAndAdjustBoundaries(long value) - { - if (value < start) { - return -1; - } - long diffFromStart = value - fixedStart; - long key = diffFromStart / bucketSpanMillis; - if (value >= end) { - long diffInBuckets = (value - end) / bucketSpanMillis; - long move = (diffInBuckets + 1) * bucketSpanMillis; - start += move; - end += move; - triggerPurge = true; - lowestPurgeableTimeBucket += diffInBuckets; + if (!initialized && bucketSpan == null) { + setBucketSpan(Duration.millis(context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS))); } - return key; - } /** - * Sets the purge listener. - * @param purgeListener purge listener + * Get the time bucket for any given time + * @param time + * @return */ - public void setPurgeListener(@NotNull PurgeListener purgeListener) - { - this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener"); - } + public abstract long getTimeBucket(long time); /** - * @return number of buckets. + * Get possible number of buckets + * @return */ - public int getNumBuckets() - { - return numBuckets; - } + public abstract long getNumBuckets(); /** - * @return reference instant + * @return time-bucket span */ - public Instant getReferenceInstant() + public Duration getBucketSpan() { - return referenceInstant; + return bucketSpan; } /** - * Sets the reference instant (by default the system time when the streaming app is created). - * This instant with {@link #expireBefore} is used to calculate the {@link #start} and {@link #end}. - * - * @param referenceInstant + * Sets the length of a time bucket. + * @param bucketSpan length of time bucket */ - public void setReferenceInstant(Instant referenceInstant) + public void setBucketSpan(Duration bucketSpan) { - this.referenceInstant = referenceInstant; + this.bucketSpan = bucketSpan; } - /** - * @return duration before which the data is expired. - */ - public Duration getExpireBefore() + public boolean isInitialized() { - return expireBefore; + return initialized; } - /** - * Sets the duration which denotes expiry. Any event with time before this duration is considered to be expired. - * @param expireBefore duration - */ - public void setExpireBefore(Duration expireBefore) + public void setInitialized(boolean initialized) { - this.expireBefore = expireBefore; + this.initialized = initialized; } /** - * @return time-bucket span + * Sets the purge listener. + * @param purgeListener purge listener */ - public Duration getBucketSpan() + public void setPurgeListener(@NotNull PurgeListener purgeListener) { - return bucketSpan; + this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener"); } - /** - * Sets the length of a time bucket. - * @param bucketSpan length of time bucket - */ - public void setBucketSpan(Duration bucketSpan) + public PurgeListener getPurgeListener() { - this.bucketSpan = bucketSpan; + return purgeListener; } /** @@ -214,5 +114,4 @@ public class TimeBucketAssigner implements ManagedStateComponent { void purgeTimeBucketsLessThanEqualTo(long timeBucket); } - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java index 5d706db..5a2fa36 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java @@ -26,4 +26,27 @@ package org.apache.apex.malhar.lib.state.managed; public interface TimeExtractor<T> { long getTime(T t); + + class FixedTimeExtractor<V> implements TimeExtractor<V> + { + + private long fixedTime; + + public FixedTimeExtractor(long fixedTime) + { + this.fixedTime = fixedTime; + } + + private FixedTimeExtractor() + { + // For kryo + } + + @Override + public long getTime(V v) + { + return fixedTime; + } + + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java new file mode 100644 index 0000000..2027249 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/UnboundedTimeBucketAssigner.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import javax.validation.constraints.NotNull; + +import com.google.common.base.Preconditions; + +/** + * Simple Time bucket assigner to assiger time bucket for any given time <br> + * The algorithm is simple to just round the time to time bucket span. <br> + * Ex. given time bucket span is 1000 milliseconds <br> + * All times 1001, 1002 ... 1999 will be assigned to time bucket 1000 <br> + * + * + */ +public class UnboundedTimeBucketAssigner extends TimeBucketAssigner +{ + @Override + public long getTimeBucket(long time) + { + + Preconditions.checkArgument(time >= 0, "Time: %s is illegal", time); + return time - time % getBucketSpan().getMillis(); + } + + @Override + public long getNumBuckets() + { + return Long.MAX_VALUE; + } + + @Override + public void setup(@NotNull ManagedStateContext managedStateContext) + { + super.setup(managedStateContext); + setInitialized(true); + } + + @Override + public void teardown() + { + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java index 24cc8b2..cc976f1 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java @@ -57,29 +57,6 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul Spillable.SpillableComponent { - private static class FixedTimeExtractor<V> implements TimeExtractor<V> - { - - private long fixedTime; - - private FixedTimeExtractor(long fixedTime) - { - this.fixedTime = fixedTime; - } - - private FixedTimeExtractor() - { - // For kryo - } - - @Override - public long getTime(V v) - { - return fixedTime; - } - - } - public static final int DEFAULT_BATCH_SIZE = 1000; public static final byte[] META_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; @@ -176,7 +153,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, false); if (timeExtractor != null) { - spillableSet = new SpillableSetImpl<>(keyPrefix.toByteArray(), store, valueSerde, new FixedTimeExtractor(keyTime)); + spillableSet = new SpillableSetImpl<>(keyPrefix.toByteArray(), store, valueSerde, new TimeExtractor.FixedTimeExtractor(keyTime)); } else { spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde); } @@ -288,7 +265,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul if (timeExtractor == null) { spillableSet = new SpillableSetImpl<>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde); } else { - spillableSet = new SpillableSetImpl<>(keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde, new FixedTimeExtractor(timeExtractor.getTime(key))); + spillableSet = new SpillableSetImpl<>(keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde, new TimeExtractor.FixedTimeExtractor(timeExtractor.getTime(key))); } spillableSet.setup(context); cache.put(key, spillableSet); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java index ef06eea..6a16e62 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.state.spillable.Spillable; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.window.SessionWindowedStorage; @@ -54,10 +55,13 @@ public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeye if (keyToWindowsMap == null) { // NOTE: this will pose difficulties when we try to assign the entries to a time bucket later on. // This is logged in APEXMALHAR-2271 - keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>>)(Serde)windowSerde); + // A work around to make session window data never expire and all kept in one time bucket + keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>>)(Serde)windowSerde, new TimeExtractor.FixedTimeExtractor(Long.MAX_VALUE)); } } + + @Override @SuppressWarnings("unchecked") public void remove(Window window) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java index 82428fb..1d2334d 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java @@ -32,7 +32,6 @@ import org.junit.runner.Description; import com.datatorrent.api.Context; import com.datatorrent.lib.fileaccess.FileAccessFSImpl; -import com.datatorrent.lib.util.KryoCloneUtils; import com.datatorrent.lib.util.TestUtils; import com.datatorrent.netlet.util.Slice; @@ -66,13 +65,6 @@ public class ManagedTimeUnifiedStateImplTest public TestMeta testMeta = new TestMeta(); @Test - public void testSerde() throws IOException - { - ManagedTimeUnifiedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState); - Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets()); - } - - @Test public void testSimplePutGet() { Slice one = ManagedStateTestUtils.getSliceFor("1"); @@ -129,7 +121,7 @@ public class ManagedTimeUnifiedStateImplTest testMeta.managedState.setup(testMeta.operatorContext); - long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time); + long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucket(time); Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket); //write data to disk explicitly @@ -151,7 +143,7 @@ public class ManagedTimeUnifiedStateImplTest testMeta.managedState.setup(testMeta.operatorContext); - long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time); + long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucket(time); Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket); //write data to disk explicitly http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java index 8ae4db7..ef67b94 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java @@ -31,7 +31,7 @@ class MockManagedStateContext implements ManagedStateContext private TFileImpl.DTFileImpl fileAccess = new TFileImpl.DTFileImpl(); private Comparator<Slice> keyComparator = new SliceComparator(); private BucketsFileSystem bucketsFileSystem = new BucketsFileSystem(); - private TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner(); + private MovingBoundaryTimeBucketAssigner timeBucketAssigner = new MovingBoundaryTimeBucketAssigner(); private final Context.OperatorContext operatorContext; @@ -58,7 +58,7 @@ class MockManagedStateContext implements ManagedStateContext } @Override - public TimeBucketAssigner getTimeBucketAssigner() + public MovingBoundaryTimeBucketAssigner getTimeBucketAssigner() { return timeBucketAssigner; } @@ -84,7 +84,7 @@ class MockManagedStateContext implements ManagedStateContext this.bucketsFileSystem = bucketsFileSystem; } - void setTimeBucketAssigner(TimeBucketAssigner timeBucketAssigner) + void setTimeBucketAssigner(MovingBoundaryTimeBucketAssigner timeBucketAssigner) { this.timeBucketAssigner = timeBucketAssigner; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java new file mode 100644 index 0000000..e4e5d2e --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; + +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.lib.util.KryoCloneUtils; + +public class MovingBoundaryTimeBucketAssignerTest +{ + + class TestMeta extends TestWatcher + { + MovingBoundaryTimeBucketAssigner timeBucketAssigner; + MockManagedStateContext mockManagedStateContext; + + @Override + protected void starting(Description description) + { + timeBucketAssigner = new MovingBoundaryTimeBucketAssigner(); + mockManagedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9)); + } + + @Override + protected void finished(Description description) + { + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testSerde() throws IOException + { + MovingBoundaryTimeBucketAssigner deserialized = KryoCloneUtils.cloneObject(testMeta.timeBucketAssigner); + Assert.assertNotNull("time bucket assigner", deserialized); + } + + @Test + public void testNumBuckets() + { + testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1)); + testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30)); + + testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); + + Assert.assertEquals("num buckets", 2, testMeta.timeBucketAssigner.getNumBuckets()); + testMeta.timeBucketAssigner.teardown(); + } + + @Test + public void testTimeBucketKey() + { + testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1)); + testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30)); + + long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis(); + testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); + + long time1 = referenceTime - Duration.standardMinutes(2).getMillis(); + Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucket(time1)); + + long time0 = referenceTime - Duration.standardMinutes(40).getMillis(); + Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucket(time0)); + + long expiredTime = referenceTime - Duration.standardMinutes(65).getMillis(); + Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(expiredTime)); + testMeta.timeBucketAssigner.teardown(); + } + + @Test + public void testTimeBucketKeyExpiry() + { + testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1)); + testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1)); + + long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis(); + testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); + + long time1 = Duration.standardSeconds(9).getMillis() + referenceTime; + Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucket(time1) ); + + long time2 = Duration.standardSeconds(10).getMillis() + referenceTime; + Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucket(time2) ); + + //Check for expiry of time1 now + Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(time1) ); + + testMeta.timeBucketAssigner.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java deleted file mode 100644 index 8ca0960..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.apex.malhar.lib.state.managed; - -import java.io.IOException; - -import org.joda.time.Duration; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import com.datatorrent.lib.util.KryoCloneUtils; - -public class TimeBucketAssignerTest -{ - - class TestMeta extends TestWatcher - { - TimeBucketAssigner timeBucketAssigner; - MockManagedStateContext mockManagedStateContext; - - @Override - protected void starting(Description description) - { - timeBucketAssigner = new TimeBucketAssigner(); - mockManagedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9)); - } - - @Override - protected void finished(Description description) - { - } - } - - @Rule - public TestMeta testMeta = new TestMeta(); - - @Test - public void testSerde() throws IOException - { - TimeBucketAssigner deserialized = KryoCloneUtils.cloneObject(testMeta.timeBucketAssigner); - Assert.assertNotNull("time bucket assigner", deserialized); - } - - @Test - public void testNumBuckets() - { - testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1)); - testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30)); - - testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); - - Assert.assertEquals("num buckets", 2, testMeta.timeBucketAssigner.getNumBuckets()); - testMeta.timeBucketAssigner.teardown(); - } - - @Test - public void testTimeBucketKey() - { - testMeta.timeBucketAssigner.setExpireBefore(Duration.standardHours(1)); - testMeta.timeBucketAssigner.setBucketSpan(Duration.standardMinutes(30)); - - long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis(); - testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); - - long time1 = referenceTime - Duration.standardMinutes(2).getMillis(); - Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1)); - - long time0 = referenceTime - Duration.standardMinutes(40).getMillis(); - Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time0)); - - long expiredTime = referenceTime - Duration.standardMinutes(65).getMillis(); - Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(expiredTime)); - testMeta.timeBucketAssigner.teardown(); - } - - @Test - public void testTimeBucketKeyExpiry() - { - testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1)); - testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1)); - - long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis(); - testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); - - long time1 = Duration.standardSeconds(9).getMillis() + referenceTime; - Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) ); - - long time2 = Duration.standardSeconds(10).getMillis() + referenceTime; - Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time2) ); - - //Check for expiry of time1 now - Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) ); - - testMeta.timeBucketAssigner.teardown(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4ab457f1/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java index f898e2d..512626e 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java @@ -58,7 +58,8 @@ import com.datatorrent.lib.util.KeyValPair; public class WindowedOperatorTest { - public static final long BASE = (System.currentTimeMillis() / 1000) * 1000; + // To test the extreme condition counting from + public static final long BASE = ((System.currentTimeMillis() - 3600000L * 24 * 365) / 1000) * 1000; @Parameterized.Parameters public static Collection<Object[]> testParameters()
