Repository: apex-malhar
Updated Branches:
  refs/heads/master e22ea0de1 -> 1ae14c03a


APEXMALHAR-2345 Fix MovingBoundaryTimeBucketAssigner initialization and purge 
trigger.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1ae14c03
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1ae14c03
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1ae14c03

Branch: refs/heads/master
Commit: 1ae14c03a73e2814b57ce06680db57da80f0b42b
Parents: 9043f9d
Author: Thomas Weise <[email protected]>
Authored: Sun Jan 22 13:19:44 2017 -0800
Committer: David Yan <[email protected]>
Committed: Sun Jan 22 21:45:28 2017 -0800

----------------------------------------------------------------------
 .../state/managed/AbstractManagedStateImpl.java |  3 +++
 .../managed/IncrementalCheckpointManager.java   |  1 +
 .../MovingBoundaryTimeBucketAssigner.java       |  5 ++--
 .../lib/state/managed/ManagedStateImplTest.java |  4 ++--
 .../MovingBoundaryTimeBucketAssignerTest.java   | 25 ++++++++++++++++++++
 5 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index daae2d8..f676b84 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -220,6 +220,8 @@ public abstract class AbstractManagedStateImpl
               buckets.get(bucketIdx).recoveredData(stateEntry.getKey(), 
bucketEntry.getValue());
             }
           }
+          // Skip write to WAL during recovery during replay from WAL.
+          // Data only needs to be transferred to bucket data files.
           checkpointManager.save(state, stateEntry.getKey(), true 
/*skipWritingToWindowFile*/);
         }
       } catch (IOException e) {
@@ -369,6 +371,7 @@ public abstract class AbstractManagedStateImpl
     }
     if (!flashData.isEmpty()) {
       try {
+        // write incremental state to WAL (skipWrite=false) before the 
checkpoint
         checkpointManager.save(flashData, windowId, false);
       } catch (IOException e) {
         throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index 65c1d1e..aa7cec7 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -111,6 +111,7 @@ public class IncrementalCheckpointManager extends 
FSWindowDataManager
           if (latestExpiredTimeBucket.get() > -1) {
             try {
               latestPurgedTimeBucket = latestExpiredTimeBucket.getAndSet(-1);
+              //LOG.debug("latestPurgedTimeBucket {}", latestPurgedTimeBucket);
               
managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(latestPurgedTimeBucket);
             } catch (IOException e) {
               throwable.set(e);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
index ece7686..cc8ea0a 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
@@ -67,7 +67,7 @@ public class MovingBoundaryTimeBucketAssigner extends 
TimeBucketAssigner
   private int numBuckets;
   private transient long fixedStart;
   private transient boolean triggerPurge;
-  private transient long lowestPurgeableTimeBucket;
+  private transient long lowestPurgeableTimeBucket = -1;
 
 
   @Override
@@ -125,8 +125,9 @@ public class MovingBoundaryTimeBucketAssigner extends 
TimeBucketAssigner
       long move = (diffInBuckets + 1) * bucketSpanMillis;
       start += move;
       end += move;
-      triggerPurge = true;
       lowestPurgeableTimeBucket += diffInBuckets;
+      // trigger purge when lower bound changes
+      triggerPurge = (diffInBuckets > 0);
     }
     return key;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
index 99e6c23..dab3925 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
@@ -177,7 +177,7 @@ public class ManagedStateImplTest
     testMeta.managedState.setup(testMeta.operatorContext);
 
     int numKeys = 300;
-    long lastWindowId = (long)numKeys;
+    long lastWindowId = numKeys;
 
     for (long windowId = 0L; windowId < lastWindowId; windowId++) {
       testMeta.managedState.beginWindow(windowId);
@@ -197,7 +197,7 @@ public class ManagedStateImplTest
     for (int key = numKeys - 1; key > 0; key--) {
       Slice keyVal = ManagedStateTestUtils.getSliceFor(Integer.toString(key));
       Slice val = testMeta.managedState.getSync(0L, keyVal);
-      Assert.assertNotNull(val);
+      Assert.assertNotNull("null value for key " + key, val);
     }
 
     testMeta.managedState.endWindow();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
index e4e5d2e..2b132f4 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
@@ -28,6 +28,8 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.commons.lang3.mutable.MutableLong;
+
 import com.datatorrent.lib.util.KryoCloneUtils;
 
 public class MovingBoundaryTimeBucketAssignerTest
@@ -96,20 +98,43 @@ public class MovingBoundaryTimeBucketAssignerTest
   @Test
   public void testTimeBucketKeyExpiry()
   {
+    final MutableLong purgeLessThanEqualTo = new MutableLong(-2);
     testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1));
     testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1));
+    testMeta.timeBucketAssigner.setPurgeListener(new 
TimeBucketAssigner.PurgeListener()
+    {
+      @Override
+      public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
+      {
+        purgeLessThanEqualTo.setValue(timeBucket);
+      }
+    });
 
     long referenceTime = 
testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
     testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+    Assert.assertEquals("purgeLessThanEqualTo", -2L, 
purgeLessThanEqualTo.longValue());
+
+    long time0 = Duration.standardSeconds(0).getMillis() + referenceTime;
+    Assert.assertEquals("time bucket", 1, 
testMeta.timeBucketAssigner.getTimeBucket(time0) );
+    testMeta.timeBucketAssigner.endWindow();
+    Assert.assertEquals("purgeLessThanEqualTo", -2, 
purgeLessThanEqualTo.longValue());
 
     long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
     Assert.assertEquals("time bucket", 10, 
testMeta.timeBucketAssigner.getTimeBucket(time1) );
+    testMeta.timeBucketAssigner.endWindow();
+    Assert.assertEquals("purgeLessThanEqualTo", 7, 
purgeLessThanEqualTo.longValue());
+    purgeLessThanEqualTo.setValue(-2);
 
     long time2 = Duration.standardSeconds(10).getMillis()  + referenceTime;
     Assert.assertEquals("time bucket", 11, 
testMeta.timeBucketAssigner.getTimeBucket(time2) );
+    testMeta.timeBucketAssigner.endWindow();
+// TODO: why is purgeLessThanEqualTo not moving to 8 here?
+    Assert.assertEquals("purgeLessThanEqualTo", -2, 
purgeLessThanEqualTo.longValue());
 
     //Check for expiry of time1 now
     Assert.assertEquals("time bucket", -1, 
testMeta.timeBucketAssigner.getTimeBucket(time1) );
+    testMeta.timeBucketAssigner.endWindow();
+    Assert.assertEquals("purgeLessThanEqualTo", -2, 
purgeLessThanEqualTo.longValue());
 
     testMeta.timeBucketAssigner.teardown();
   }

Reply via email to