Removed checking all the window ids in idempotency storage before replay
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ada42ab9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ada42ab9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ada42ab9 Branch: refs/heads/feature-AppData Commit: ada42ab9ba0fe17162a38162b6d889afc91e741c Parents: a57a3d7 Author: ishark <[email protected]> Authored: Fri Aug 14 14:37:24 2015 -0700 Committer: ishark <[email protected]> Committed: Fri Aug 14 14:37:24 2015 -0700 ---------------------------------------------------------------------- .../redis/AbstractRedisInputOperator.java | 23 ++++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ada42ab9/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java index 7f79bd0..260fbf6 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java @@ -35,7 +35,8 @@ import com.datatorrent.lib.io.IdempotentStorageManager; * @category Input * @tags redis, key value * - * @param <T> The tuple type. + * @param <T> + * The tuple type. * @since 0.9.3 */ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener @@ -47,6 +48,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp private transient Integer backupOffset; private int scanCount; private transient boolean replay; + private transient boolean skipOffsetRecovery = true; @NotNull private IdempotentStorageManager idempotentStorageManager; @@ -92,33 +94,27 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp private void replay(long windowId) { try { - if (checkIfWindowExistsInIdempotencyManager(windowId - 1)) { + // For first recovered window, offset is already part of recovery state. + // So skip reading from idempotency manager + if (!skipOffsetRecovery) { // Begin offset for this window is recovery offset stored for the last // window RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1); recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow; } - + skipOffsetRecovery = false; RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId); recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow; if (recoveryState.scanOffsetAtBeginWindow != null) { scanOffset = recoveryState.scanOffsetAtBeginWindow; } replay = true; + } catch (IOException e) { DTThrowable.rethrow(e); } } - private boolean checkIfWindowExistsInIdempotencyManager(long windowId) throws IOException - { - long[] windowsIds = getIdempotentStorageManager().getWindowIds(context.getId()); - if(windowsIds.length == 0 || windowId < windowsIds[0] || windowId > windowsIds[windowsIds.length - 1]) { - return false; - } - return true ; - } - private void scanKeysFromOffset() { if (!scanComplete) { @@ -157,11 +153,14 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp scanComplete = false; scanParameters = new ScanParams(); scanParameters.count(scanCount); + // For the 1st window after checkpoint, windowID - 1 would not have recovery // offset stored in idempotentStorageManager // But recoveryOffset is non-transient, so will be recovered with // checkPointing + // Offset recovery from idempotency storage can be skipped in this case scanOffset = recoveryState.scanOffsetAtBeginWindow; + skipOffsetRecovery = true; } @Override
