Repository: apex-malhar Updated Branches: refs/heads/master 26fa9d781 -> d1fb2b604
APEXMALHAR-2129 removed moving time boundaries periodically and asynchrnously based on system time Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d797bdd1 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d797bdd1 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d797bdd1 Branch: refs/heads/master Commit: d797bdd12ca710c209cb9980119e54af2adcae57 Parents: 37cb584 Author: Chandni Singh <[email protected]> Authored: Wed Aug 10 14:09:30 2016 -0700 Committer: Chandni Singh <[email protected]> Committed: Thu Aug 11 17:28:22 2016 -0700 ---------------------------------------------------------------------- .../state/managed/AbstractManagedStateImpl.java | 1 - .../lib/state/managed/ManagedStateImpl.java | 2 +- .../lib/state/managed/ManagedTimeStateImpl.java | 6 +- .../managed/ManagedTimeUnifiedStateImpl.java | 6 +- .../lib/state/managed/TimeBucketAssigner.java | 96 +++++++------------- .../ManagedTimeUnifiedStateImplTest.java | 4 +- .../state/managed/TimeBucketAssignerTest.java | 34 +------ 7 files changed, 47 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java index b5b9f8c..927a6df 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java @@ -245,7 +245,6 @@ public abstract class AbstractManagedStateImpl if (throwable.get() != null) { Throwables.propagate(throwable.get()); } - timeBucketAssigner.beginWindow(windowId); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java index b4453d5..ba8cdc6 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java @@ -55,7 +55,7 @@ public class ManagedStateImpl extends AbstractManagedStateImpl implements Bucket @Override public void put(long bucketId, @NotNull Slice key, @NotNull Slice value) { - long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); putInBucket(bucketId, timeBucket, key, value); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java index b441183..eddc736 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java @@ -48,7 +48,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti @Override public void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value) { - long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); putInBucket(bucketId, timeBucket, key, value); } @@ -61,7 +61,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti @Override public Slice getSync(long bucketId, long time, @NotNull Slice key) { - long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); if (timeBucket == -1) { //time is expired so no point in looking further. return BucketedState.EXPIRED; @@ -78,7 +78,7 @@ public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements Ti @Override public Future<Slice> getAsync(long bucketId, long time, Slice key) { - long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); if (timeBucket == -1) { //time is expired so no point in looking further. return Futures.immediateFuture(BucketedState.EXPIRED); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java index 487f89c..e04286e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java @@ -67,14 +67,14 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem @Override public void put(long time, @NotNull Slice key, @NotNull Slice value) { - long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); putInBucket(timeBucket, timeBucket, key, value); } @Override public Slice getSync(long time, @NotNull Slice key) { - long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); if (timeBucket == -1) { //time is expired so return expired slice. return BucketedState.EXPIRED; @@ -85,7 +85,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem @Override public Future<Slice> getAsync(long time, @NotNull Slice key) { - long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + long timeBucket = timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time); if (timeBucket == -1) { //time is expired so return expired slice. return Futures.immediateFuture(BucketedState.EXPIRED); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java index a60bc72..435ffe2 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java @@ -28,10 +28,9 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.base.Preconditions; import com.datatorrent.api.Context; -import com.datatorrent.lib.appdata.query.WindowBoundedService; /** - * Keeps track of time buckets.<br/> + * Keeps track of time buckets and triggers purging of obsolete time-buckets.<br/> * * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets, * which time-bucket an event falls into and sliding the time boundaries. @@ -40,23 +39,16 @@ import com.datatorrent.lib.appdata.query.WindowBoundedService; * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/> * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and - * <code>rererenceInstant = current-time</code>, then <code> + * <code>rererenceInstant = currentTime</code>, then <code> * numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/> * * These properties once configured shouldn't be changed because that will result in different time-buckets * for the same (key,time) pair after a failure. * <p/> * - * The time boundaries- start and end, periodically move by span of a single time-bucket. Any event with time < start - * is expired. These boundaries slide between application window by the expiry task asynchronously.<br/> - * The boundaries move only between an application window to ensure consistency of a checkpoint. Checkpoint will happen - * at application window boundaries so if we do not restrict moving start and end within an app window boundary, it may - * happen that old value of 'start' is saved with the new value of 'end'. - * - * <p/> - * - * The boundaries can also be moved by {@link #getTimeBucketFor(long)}. The time which is passed as an argument to this - * method can be ahead of <code>end</code>. This means that the corresponding event is a future event + * The time boundaries- start and end, move by multiples of time-bucket span. Any event with time < start + * is considered expired. The boundaries slide by {@link #getTimeBucketAndAdjustBoundaries(long)}. The time which is passed as an + * argument to this method can be ahead of <code>end</code>. This means that the corresponding event is a future event * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further. * * @since 3.4.0 @@ -79,30 +71,12 @@ public class TimeBucketAssigner implements ManagedStateComponent private long end; private int numBuckets; private transient long fixedStart; - private transient long lowestTimeBucket; + private transient boolean triggerPurge; + private transient long lowestPurgeableTimeBucket; private boolean initialized; - private transient WindowBoundedService windowBoundedService; - - private transient PurgeListener purgeListener = null; - - private final transient Runnable expiryTask = new Runnable() - { - @Override - public void run() - { - synchronized (lock) { - start += bucketSpanMillis; - end += bucketSpanMillis; - if (purgeListener != null) { - purgeListener.purgeTimeBucketsLessThanEqualTo(lowestTimeBucket++); - } - } - } - }; - - private final transient Object lock = new Object(); + private transient PurgeListener purgeListener; @Override public void setup(@NotNull ManagedStateContext managedStateContext) @@ -122,55 +96,55 @@ public class TimeBucketAssigner implements ManagedStateComponent initialized = true; } - lowestTimeBucket = (start - fixedStart) / bucketSpanMillis; - windowBoundedService = new WindowBoundedService(bucketSpanMillis, expiryTask); - windowBoundedService.setup(context); } - public void beginWindow(long windowId) + public void endWindow() { - windowBoundedService.beginWindow(windowId); + if (triggerPurge && purgeListener != null) { + triggerPurge = false; + purgeListener.purgeTimeBucketsLessThanEqualTo(lowestPurgeableTimeBucket); + } } - public void endWindow() + @Override + public void teardown() { - windowBoundedService.endWindow(); } /** - * Get the bucket key for the long value. + * Get the bucket key for the long value and adjust boundaries if necessary. * * @param value value from which bucket key is derived. * @return -1 if value is already expired; bucket key otherwise. */ - public long getTimeBucketFor(long value) + public long getTimeBucketAndAdjustBoundaries(long value) { - synchronized (lock) { - if (value < start) { - return -1; - } - long diffFromStart = value - fixedStart; - long key = diffFromStart / bucketSpanMillis; - if (value > end) { - long move = ((value - end) / bucketSpanMillis + 1) * bucketSpanMillis; - start += move; - end += move; - } - return key; + if (value < start) { + return -1; } + long diffFromStart = value - fixedStart; + long key = diffFromStart / bucketSpanMillis; + if (value > end) { + long diffInBuckets = (value - end) / bucketSpanMillis; + long move = (diffInBuckets + 1) * bucketSpanMillis; + start += move; + end += move; + triggerPurge = true; + lowestPurgeableTimeBucket += diffInBuckets; + } + return key; + } + /** + * Sets the purge listener. + * @param purgeListener purge listener + */ public void setPurgeListener(@NotNull PurgeListener purgeListener) { this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener"); } - @Override - public void teardown() - { - windowBoundedService.teardown(); - } - /** * @return number of buckets. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java index c7920de..6808b63 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java @@ -110,7 +110,7 @@ public class ManagedTimeUnifiedStateImplTest testMeta.managedState.setup(testMeta.operatorContext); - long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time); + long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time); Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket); //write data to disk explicitly @@ -132,7 +132,7 @@ public class ManagedTimeUnifiedStateImplTest testMeta.managedState.setup(testMeta.operatorContext); - long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time); + long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(time); Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket); //write data to disk explicitly http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d797bdd1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java index 952b4f6..4ceef1f 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java @@ -20,8 +20,6 @@ package org.apache.apex.malhar.lib.state.managed; import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; import org.joda.time.Duration; import org.junit.Assert; @@ -85,39 +83,13 @@ public class TimeBucketAssignerTest testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); long time1 = referenceTime - Duration.standardMinutes(2).getMillis(); - Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucketFor(time1)); + Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1)); long time0 = referenceTime - Duration.standardMinutes(40).getMillis(); - Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucketFor(time0)); + Assert.assertEquals("time bucket", 0, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time0)); long expiredTime = referenceTime - Duration.standardMinutes(65).getMillis(); - Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketFor(expiredTime)); + Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(expiredTime)); testMeta.timeBucketAssigner.teardown(); } - - @Test - public void testSlidingOnlyBetweenWindow() throws InterruptedException - { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicInteger timesCalled = new AtomicInteger(); - testMeta.timeBucketAssigner.setPurgeListener(new TimeBucketAssigner.PurgeListener() - { - @Override - public void purgeTimeBucketsLessThanEqualTo(long timeBucket) - { - timesCalled.getAndIncrement(); - latch.countDown(); - } - }); - - testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); - testMeta.timeBucketAssigner.beginWindow(0); - latch.await(); - testMeta.timeBucketAssigner.endWindow(); - int valueBeforeSleep = timesCalled.get(); - Thread.sleep(1000); - Assert.assertEquals("value should not change", valueBeforeSleep, timesCalled.get()); - testMeta.timeBucketAssigner.teardown(); - } - }
