Repository: apex-malhar Updated Branches: refs/heads/master d5c24dc8e -> a26a9f8b2
APEXMALHAR-2406 APEXMALHAR-2407 APEXMALHAR-2408 ManagedState Issues Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a26a9f8b Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a26a9f8b Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a26a9f8b Branch: refs/heads/master Commit: a26a9f8b2940986db313957fd3c9969489ab47ee Parents: d5c24dc Author: chaitanya <[email protected]> Authored: Sun Mar 19 17:40:33 2017 +0530 Committer: chaitanya <[email protected]> Committed: Sun Mar 19 17:40:55 2017 +0530 ---------------------------------------------------------------------- .../apex/malhar/lib/state/managed/Bucket.java | 11 +++-- .../lib/state/managed/BucketsFileSystem.java | 13 +++--- .../managed/IncrementalCheckpointManager.java | 5 +-- .../MovingBoundaryTimeBucketAssigner.java | 25 ++++++++++-- .../state/managed/BucketsFileSystemTest.java | 23 +++++++++-- .../lib/state/managed/DefaultBucketTest.java | 4 +- .../IncrementalCheckpointManagerTest.java | 43 +++++++++++++++++++- .../state/managed/ManagedTimeStateImplTest.java | 2 +- .../ManagedTimeUnifiedStateImplTest.java | 4 +- .../MovingBoundaryTimeBucketAssignerTest.java | 11 ++--- 10 files changed, 106 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java index 4f2cefd..6292fe2 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java @@ -354,13 +354,19 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide } /** - * Returns the value for the key from a time-bucket reader + * Returns the value for the key from a valid time-bucket reader. Here, valid means the time bucket which is not purgeable. + * If the timebucketAssigner is of type MovingBoundaryTimeBucketAssigner and the time bucket is purgeable, then return null. * @param key key * @param timeBucket time bucket - * @return value if key is found in the time bucket; false otherwise + * @return value if key is found in the time bucket; null otherwise */ private BucketedValue getValueFromTimeBucketReader(Slice key, long timeBucket) { + + if (managedStateContext.getTimeBucketAssigner() instanceof MovingBoundaryTimeBucketAssigner && + timeBucket <= ((MovingBoundaryTimeBucketAssigner)managedStateContext.getTimeBucketAssigner()).getLowestPurgeableTimeBucket()) { + return null; + } FileAccess.FileReader fileReader = readers.get(timeBucket); if (fileReader != null) { return readValue(fileReader, key, timeBucket); @@ -469,7 +475,6 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide } } } - sizeInBytes.getAndAdd(-memoryFreed); //add the windowId to the queue to let operator thread release memory from keyStream and valueStream http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java index 2bd6ef7..510f3f5 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java @@ -139,13 +139,16 @@ public class BucketsFileSystem implements ManagedStateComponent * @param data data of all time-buckets * @throws IOException */ - protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data) throws IOException + protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data, long latestPurgedTimeBucket) throws IOException { Table<Long, Slice, Bucket.BucketedValue> timeBucketedKeys = TreeBasedTable.create(Ordering.<Long>natural(), managedStateContext.getKeyComparator()); for (Map.Entry<Slice, Bucket.BucketedValue> entry : data.entrySet()) { long timeBucketId = entry.getValue().getTimeBucket(); + if (timeBucketId <= latestPurgedTimeBucket) { + continue; + } timeBucketedKeys.put(timeBucketId, entry.getKey(), entry.getValue()); } @@ -273,17 +276,11 @@ public class BucketsFileSystem implements ManagedStateComponent */ private MutableTimeBucketMeta timeBucketMetaHelper(long bucketId, long timeBucketId) throws IOException { - MutableTimeBucketMeta tbm = timeBucketsMeta.get(bucketId, timeBucketId); - if (tbm != null) { - return tbm; - } - if (exists(bucketId, META_FILE_NAME)) { + if (!timeBucketsMeta.containsRow(bucketId) && exists(bucketId, META_FILE_NAME)) { try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) { //Load meta info of all the time buckets of the bucket identified by bucketId. loadBucketMetaFile(bucketId, dis); } - } else { - return null; } return timeBucketsMeta.get(bucketId, timeBucketId); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java index aa7cec7..ed40aa6 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java @@ -136,11 +136,8 @@ public class IncrementalCheckpointManager extends FSWindowDataManager for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> singleBucket : buckets.entrySet()) { long bucketId = singleBucket.getKey(); - if (bucketId > latestPurgedTimeBucket) { - managedStateContext.getBucketsFileSystem().writeBucketData(windowId, bucketId, singleBucket.getValue()); - } + managedStateContext.getBucketsFileSystem().writeBucketData(windowId, bucketId, singleBucket.getValue(), latestPurgedTimeBucket); } - committed(windowId); } catch (Throwable t) { throwable.set(t); LOG.debug("transfer window {}", windowId, t); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java index cc8ea0a..f3b40e1 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java @@ -107,7 +107,20 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner } /** - * Get the bucket key for the long value and adjust boundaries if necessary. + * Get the bucket key for the long value and adjust boundaries if necessary. If boundaries adjusted then verify + * the triggerPurge is enabled or not. triggerPurge is enabled only when the lower bound changes. + * + * For example, + * ExpiryDuration = 1000 milliseconds, BucketSpan = 2000 milliseconds + * Times with 0,...999 belongs to time bucket id 0, times with 1000,...1999 belongs to bucket id 1,...so on. + * Initially start = 0, end = 2000, fixedStart = 0 + * (1) If the input with time 50 milliseconds then this belongs to bucket id 0. + * + * (2) If the input with time 2100 milliseconds then boundary has to be adjusted. + * Values after tuple is processed, diffInBuckets = 0, move = 1000, start = 1000, end = 3000,triggerPurge = true, lowestPurgeableTimeBucket = -1 + * + * (3) If the input with time 3200 milliseconds then boundary has to be adjusted. + * Values after tuple is processed, diffInBuckets = 0, move = 1000, start = 2000, end = 4000,triggerPurge = true, lowestPurgeableTimeBucket = 0 * * @param time value from which bucket key is derived. * @return -1 if value is already expired; bucket key otherwise. @@ -125,9 +138,11 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner long move = (diffInBuckets + 1) * bucketSpanMillis; start += move; end += move; - lowestPurgeableTimeBucket += diffInBuckets; // trigger purge when lower bound changes - triggerPurge = (diffInBuckets > 0); + triggerPurge = (move > 0); + if (triggerPurge) { + lowestPurgeableTimeBucket = ((start - fixedStart) / bucketSpanMillis) - 2; + } } return key; @@ -178,4 +193,8 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner this.expireBefore = expireBefore; } + public long getLowestPurgeableTimeBucket() + { + return lowestPurgeableTimeBucket; + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java index ede2c85..28b3824 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java @@ -72,7 +72,7 @@ public class BucketsFileSystemTest { testMeta.bucketsFileSystem.setup(testMeta.managedStateContext); Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100); - testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0); + testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0, -1); ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 1); testMeta.bucketsFileSystem.teardown(); @@ -83,10 +83,10 @@ public class BucketsFileSystemTest { testMeta.bucketsFileSystem.setup(testMeta.managedStateContext); Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100); - testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0); + testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0, -1); Map<Slice, Bucket.BucketedValue> more = ManagedStateTestUtils.getTestBucketData(50, 100); - testMeta.bucketsFileSystem.writeBucketData(10, 0, more); + testMeta.bucketsFileSystem.writeBucketData(10, 0, more, -1); unsavedBucket0.putAll(more); ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 2); @@ -169,4 +169,21 @@ public class BucketsFileSystemTest Assert.assertEquals("tbm 2", 2, immutableTbm.getTimeBucketId()); testMeta.bucketsFileSystem.teardown(); } + + @Test + public void testFirstKeyAfterTransferBuckets() throws IOException + { + testMeta.bucketsFileSystem.setup(testMeta.managedStateContext); + Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(50, 100); + testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0, -1); + + Map<Slice, Bucket.BucketedValue> unsavedBucket1 = ManagedStateTestUtils.getTestBucketData(24, 104); + testMeta.bucketsFileSystem.writeBucketData(20, 0, unsavedBucket1, -1); + + BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(0, 104); + Assert.assertNotNull(immutableTbm); + Assert.assertEquals("last transferred window", 20, immutableTbm.getLastTransferredWindowId()); + Assert.assertEquals("first key", "24", immutableTbm.getFirstKey().stringValue()); + testMeta.bucketsFileSystem.teardown(); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java index f7e24de..8a63f5a 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java @@ -98,7 +98,7 @@ public class DefaultBucketTest Slice one = ManagedStateTestUtils.getSliceFor("1"); Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100); - testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0); + testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0, -1); ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1); @@ -115,7 +115,7 @@ public class DefaultBucketTest Slice one = ManagedStateTestUtils.getSliceFor("1"); Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100); - testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0); + testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0, -1); ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java index a7e0827..fe4c20d 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java @@ -20,6 +20,7 @@ package org.apache.apex.malhar.lib.state.managed; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import com.datatorrent.api.Context; import com.datatorrent.lib.fileaccess.FileAccessFSImpl; @@ -133,6 +135,43 @@ public class IncrementalCheckpointManagerTest } @Test + public void testTransferWindowFilesExcludeExpiredBuckets() throws IOException, InterruptedException + { + testMeta.checkpointManager.setup(testMeta.managedStateContext); + + int startKeyBucket = 200; + Map<Long, Map<Slice, Bucket.BucketedValue>> buckets = ManagedStateTestUtils.getTestData(startKeyBucket, startKeyBucket + 10, 0); + long latestExpiredTimeBucket = 102; + testMeta.checkpointManager.setLatestExpiredTimeBucket(latestExpiredTimeBucket); + testMeta.checkpointManager.save(buckets, 10, false); + //Need to synchronously call transfer window files so shutting down the other thread. + testMeta.checkpointManager.teardown(); + Thread.sleep(500); + + testMeta.checkpointManager.committed(10); + testMeta.checkpointManager.transferWindowFiles(); + + // Retrieve the data which is not expired + Map<Long, Map<Slice, Bucket.BucketedValue>> bucketsValidData = new HashMap<>(); + for (int i = 0; i < 5; i++) { + Map<Slice, Bucket.BucketedValue> data = buckets.get((long)startKeyBucket + i); + Map<Slice, Bucket.BucketedValue> bucketData = Maps.newHashMap(); + for (Map.Entry<Slice,Bucket.BucketedValue> e: data.entrySet()) { + if (e.getValue().getTimeBucket() <= latestExpiredTimeBucket) { + continue; + } + bucketData.put(e.getKey(), e.getValue()); + } + bucketsValidData.put((long)startKeyBucket + i, bucketData); + } + + for (int i = 0; i < 5; i++) { + ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedStateContext.getFileAccess(), startKeyBucket + i, + bucketsValidData.get((long)startKeyBucket + i), 1); + } + } + + @Test public void testCommitted() throws IOException, InterruptedException { CountDownLatch latch = new CountDownLatch(5); @@ -186,9 +225,9 @@ public class IncrementalCheckpointManagerTest @Override protected void writeBucketData(long windowId, long bucketId, Map<Slice, - Bucket.BucketedValue> data) throws IOException + Bucket.BucketedValue> data, long latestPurgedTimeBucket) throws IOException { - super.writeBucketData(windowId, bucketId, data); + super.writeBucketData(windowId, bucketId, data, latestPurgedTimeBucket); if (windowId == 10) { latch.countDown(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java index ed53f08..2882828 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java @@ -86,7 +86,7 @@ public class ManagedTimeStateImplTest testMeta.managedState.setup(testMeta.operatorContext); Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time); - testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0); + testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0, -1); ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(), 0, unsavedBucket0, 1); Future<Slice> valFuture = testMeta.managedState.getAsync(0, zero); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java index 1d2334d..42ab187 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java @@ -125,7 +125,7 @@ public class ManagedTimeUnifiedStateImplTest Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket); //write data to disk explicitly - testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0); + testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0, -1); ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(), unsavedBucket0, 1); @@ -147,7 +147,7 @@ public class ManagedTimeUnifiedStateImplTest Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket); //write data to disk explicitly - testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0); + testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0, -1); ManagedStateTestUtils.validateBucketOnFileSystem(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(), unsavedBucket0, 1); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a26a9f8b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java index 2b132f4..97f6c20 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java @@ -113,28 +113,25 @@ public class MovingBoundaryTimeBucketAssignerTest long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis(); testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); Assert.assertEquals("purgeLessThanEqualTo", -2L, purgeLessThanEqualTo.longValue()); - long time0 = Duration.standardSeconds(0).getMillis() + referenceTime; Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucket(time0) ); testMeta.timeBucketAssigner.endWindow(); - Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue()); + Assert.assertEquals("purgeLessThanEqualTo", -1, purgeLessThanEqualTo.longValue()); long time1 = Duration.standardSeconds(9).getMillis() + referenceTime; Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucket(time1) ); testMeta.timeBucketAssigner.endWindow(); - Assert.assertEquals("purgeLessThanEqualTo", 7, purgeLessThanEqualTo.longValue()); - purgeLessThanEqualTo.setValue(-2); + Assert.assertEquals("purgeLessThanEqualTo", 8, purgeLessThanEqualTo.longValue()); long time2 = Duration.standardSeconds(10).getMillis() + referenceTime; Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucket(time2) ); testMeta.timeBucketAssigner.endWindow(); -// TODO: why is purgeLessThanEqualTo not moving to 8 here? - Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue()); + Assert.assertEquals("purgeLessThanEqualTo", 9, purgeLessThanEqualTo.longValue()); //Check for expiry of time1 now Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(time1) ); testMeta.timeBucketAssigner.endWindow(); - Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue()); + Assert.assertEquals("purgeLessThanEqualTo", 9, purgeLessThanEqualTo.longValue()); testMeta.timeBucketAssigner.teardown(); }
