Removed checking all the window ids in idempotency storage before replay

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ada42ab9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ada42ab9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ada42ab9

Branch: refs/heads/feature-AppData
Commit: ada42ab9ba0fe17162a38162b6d889afc91e741c
Parents: a57a3d7
Author: ishark <[email protected]>
Authored: Fri Aug 14 14:37:24 2015 -0700
Committer: ishark <[email protected]>
Committed: Fri Aug 14 14:37:24 2015 -0700

----------------------------------------------------------------------
 .../redis/AbstractRedisInputOperator.java       | 23 ++++++++++----------
 1 file changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ada42ab9/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index 7f79bd0..260fbf6 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -35,7 +35,8 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
  * @category Input
  * @tags redis, key value
  *
- * @param <T> The tuple type.
+ * @param <T>
+ *          The tuple type.
  * @since 0.9.3
  */
 public abstract class AbstractRedisInputOperator<T> extends 
AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener
@@ -47,6 +48,7 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractStoreInputOp
   private transient Integer backupOffset;
   private int scanCount;
   private transient boolean replay;
+  private transient boolean skipOffsetRecovery = true;
 
   @NotNull
   private IdempotentStorageManager idempotentStorageManager;
@@ -92,33 +94,27 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractStoreInputOp
   private void replay(long windowId)
   {
     try {
-      if (checkIfWindowExistsInIdempotencyManager(windowId - 1)) {
+      // For first recovered window, offset is already part of recovery state.
+      // So skip reading from idempotency manager
+      if (!skipOffsetRecovery) {
         // Begin offset for this window is recovery offset stored for the last
         // window
         RecoveryState recoveryStateForLastWindow = (RecoveryState) 
getIdempotentStorageManager().load(context.getId(), windowId - 1);
         recoveryState.scanOffsetAtBeginWindow = 
recoveryStateForLastWindow.scanOffsetAtBeginWindow;
       }
-
+      skipOffsetRecovery = false;
       RecoveryState recoveryStateForCurrentWindow = (RecoveryState) 
getIdempotentStorageManager().load(context.getId(), windowId);
       recoveryState.numberOfScanCallsInWindow = 
recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
       if (recoveryState.scanOffsetAtBeginWindow != null) {
         scanOffset = recoveryState.scanOffsetAtBeginWindow;
       }
       replay = true;
+
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }
   }
 
-  private boolean checkIfWindowExistsInIdempotencyManager(long windowId) 
throws IOException
-  {
-    long[] windowsIds = 
getIdempotentStorageManager().getWindowIds(context.getId());
-    if(windowsIds.length == 0 || windowId < windowsIds[0] || windowId > 
windowsIds[windowsIds.length - 1]) {
-      return false;
-    }
-    return true ;
-  }
-
   private void scanKeysFromOffset()
   {
     if (!scanComplete) {
@@ -157,11 +153,14 @@ public abstract class AbstractRedisInputOperator<T> 
extends AbstractStoreInputOp
     scanComplete = false;
     scanParameters = new ScanParams();
     scanParameters.count(scanCount);
+    
     // For the 1st window after checkpoint, windowID - 1 would not have 
recovery
     // offset stored in idempotentStorageManager
     // But recoveryOffset is non-transient, so will be recovered with
     // checkPointing
+    // Offset recovery from idempotency storage can be skipped in this case
     scanOffset = recoveryState.scanOffsetAtBeginWindow;
+    skipOffsetRecovery = true;
   }
 
   @Override

Reply via email to