prateekm commented on code in PR #1682:
URL: https://github.com/apache/samza/pull/1682#discussion_r1298650351
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName,
Checkpoint>> restoreAllTaskInstan
restoreFuture = taskRestoreManager.restore();
}
- CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ // Order of the following async operations is critical. They are
chained as follows:
+ // 1. check if restore succeeded in the first try -> if it failed with
DeletedException, flag restore to be retried.
+ // 2. retry restore with getDeleted if previous step completes and
returns true.
+ // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to
ensure no inflight restore on taskRestoreManager
+ // fails with closed taskRestoreManager
+
+ CompletableFuture<Boolean> shouldRetryRestoreFuture =
+ // check if the restore should be retried. If restore returned a
DeletedException, this future return true
Review Comment:
Move comments to method javadoc for all.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName,
Checkpoint>> restoreAllTaskInstan
restoreFuture = taskRestoreManager.restore();
}
- CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ // Order of the following async operations is critical. They are
chained as follows:
+ // 1. check if restore succeeded in the first try -> if it failed with
DeletedException, flag restore to be retried.
+ // 2. retry restore with getDeleted if previous step completes and
returns true.
+ // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to
ensure no inflight restore on taskRestoreManager
+ // fails with closed taskRestoreManager
+
+ CompletableFuture<Boolean> shouldRetryRestoreFuture =
+ // check if the restore should be retried. If restore returned a
DeletedException, this future return true
+ shouldRetryFailedRestore(restoreFuture, startTime,
samzaContainerMetrics, taskInstanceName);
+
+ CompletableFuture<Checkpoint> taskCheckpointFuture =
+ // Creates a future for either the new task checkpoint if the
restore was retried, or returns the old task
+ // checkpoint otherwise
+ retryFailedRestore(shouldRetryRestoreFuture, taskInstanceName,
taskCheckpoints, checkpointManager,
+ taskRestoreManager, config, taskInstanceMetrics, executor,
loggedStoreDir, jobContext, containerModel,
+ factoryName, taskBackendFactoryToStoreNames);
+
+ CompletableFuture<Void> cleanUpFuture =
+ // cleanup all the resources (like taskRestoreManager), update
restore time and update task checkpoint
+ // returned from the previous future in the chain
+ cleanUpResources(taskCheckpointFuture, startTime,
samzaContainerMetrics, taskInstanceName,
+ taskRestoreManager, newTaskCheckpoints);
+
+ restoreAndCleanupFutures.add(cleanUpFuture);
+ });
+ });
+
+ return CompletableFuture.allOf(restoreAndCleanupFutures.toArray(new
CompletableFuture[0]))
+ .thenApply(aVoid -> newTaskCheckpoints);
+ }
+
+ private static CompletableFuture<Boolean>
shouldRetryFailedRestore(CompletableFuture<Void> restoreFuture, long startTime,
+ SamzaContainerMetrics samzaContainerMetrics, TaskName taskInstanceName) {
+ return restoreFuture.handle((aVoid, ex) -> {
+ if (ex != null) {
+ if (isUnwrappedExceptionDeletedException(ex)) {
+ // if the exception is of type DeletedException, retry restore (with
getDeleted flag set to true).
+ return true;
+ } else {
updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("Error restoring state for task: %s",
taskInstanceName.getTaskName());
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex); // wrap in unchecked exception to
throw from lambda
+ }
+ }
+ return false;
+ });
+ }
- if (ex != null) {
- if (isUnwrappedExceptionDeletedException(ex)) {
- LOG.warn("Received DeletedException during restore for task {}.
Attempting to get blobs with getDeleted flag",
- taskInstanceName.getTaskName());
-
- // Try to restore with getDeleted flag
- CompletableFuture<Checkpoint> future =
- restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
- checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
-
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
- jobContext, containerModel);
- try {
- newTaskCheckpoints.put(taskInstanceName, future);
- } catch (Exception e) {
- String msg = String.format("DeletedException during restore
task: %s after retrying to get deleted blobs.", taskName);
- throw new SamzaException(msg, e);
- } finally {
- updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
- }
- } else {
- // log and rethrow exception to communicate restore failure
- String msg = String.format("Error restoring state for task: %s",
taskName);
- LOG.error(msg, ex);
- throw new SamzaException(msg, ex); // wrap in unchecked
exception to throw from lambda
- }
- }
+ private static CompletableFuture<Checkpoint>
retryFailedRestore(CompletableFuture<Boolean> shouldRetryRestoreFuture,
+ TaskName taskName, Map<TaskName, Checkpoint> taskCheckpoints,
CheckpointManager checkpointManager,
+ TaskRestoreManager taskRestoreManager, Config config, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ ExecutorService executor, File loggedStoreBaseDirectory, JobContext
jobContext, ContainerModel containerModel,
+ String factoryName, Map<TaskName, Map<String, Set<String>>>
taskBackendFactoryToStoreNames) {
+ return shouldRetryRestoreFuture.thenCompose(shouldRetryRestore -> {
+ if (shouldRetryRestore) {
+ // Try to restore with getDeleted flag - return a new task checkpoint
+ return restoreDeletedSnapshot(taskName, taskCheckpoints,
checkpointManager, taskRestoreManager,
+ config, taskInstanceMetrics, executor,
+ taskBackendFactoryToStoreNames.get(taskName).get(factoryName),
loggedStoreBaseDirectory, jobContext,
+ containerModel);
+ }
+ // if shouldRetryRestore is false, do not retry restore. This means
restore completed successfully in the first try
+ // Return the old task checkpoint as no new checkpoint was created
+ return CompletableFuture.completedFuture(taskCheckpoints.get(taskName));
+ });
+ }
- // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
- // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
- try {
- taskRestoreManager.close();
- } catch (Exception e) {
- LOG.error("Error closing restore manager for task: {} after {}
restore", taskName,
- ex != null ? "unsuccessful" : "successful", e);
- // ignore exception from close. container may still be able to
continue processing/backups
- // if restore manager close fails.
- }
- return null;
- });
- taskRestoreFutures.add(taskRestoreFuture);
- });
+ private static CompletableFuture<Void>
cleanUpResources(CompletableFuture<Checkpoint> taskCheckpointFuture,
+ long startTime, SamzaContainerMetrics samzaContainerMetrics, TaskName
taskName, TaskRestoreManager taskRestoreManager,
+ Map<TaskName, Checkpoint> newTaskCheckpoints) {
+ return taskCheckpointFuture.handle((checkpoint, throwable) -> {
+ // exception or not, update the restore time and close
TaskRestoreManager as there will be no more retries.
+ updateRestoreTime(startTime, samzaContainerMetrics, taskName);
+ closeTaskRestoreManager(taskRestoreManager, taskName.getTaskName());
+ if (throwable != null) {
+ throw new SamzaException("Exception from attempt to restore deleted
snapshot", throwable);
Review Comment:
"Error restoring state for task: {taskName}"
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -358,4 +403,20 @@ private static Boolean
isUnwrappedExceptionDeletedException(Throwable ex) {
FutureUtil.unwrapExceptions(SamzaException.class, ex));
return unwrappedException instanceof DeletedException;
}
+
+ /**
+ * Stop persistent stores.
+ * NOTE: Call this method concurrently for all {@link TaskRestoreManager}s
so that stop() can be parallelized.
+ * Certain persistent stores opened in BulkLoad mode are compacted on stop,
so paralleling stop()
+ * also parallelizes their compaction (a time-intensive operation).
+ */
+ private static void closeTaskRestoreManager(TaskRestoreManager
taskRestoreManager, String taskName) {
Review Comment:
Can inline this method.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -133,8 +135,8 @@ private static CompletableFuture<Map<TaskName, Checkpoint>>
restoreAllTaskInstan
Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
ExecutorService executor,
File loggedStoreDir, Set<String> forceRestoreTask) {
- Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new
ConcurrentHashMap<>();
- List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+ Map<TaskName, Checkpoint> newTaskCheckpoints = new ConcurrentHashMap<>();
+ Queue<Future<Void>> restoreAndCleanupFutures = new
ConcurrentLinkedQueue<>();
Review Comment:
Why does this need to be a concurrent queue?
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName,
Checkpoint>> restoreAllTaskInstan
restoreFuture = taskRestoreManager.restore();
}
- CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ // Order of the following async operations is critical. They are
chained as follows:
+ // 1. check if restore succeeded in the first try -> if it failed with
DeletedException, flag restore to be retried.
+ // 2. retry restore with getDeleted if previous step completes and
returns true.
+ // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to
ensure no inflight restore on taskRestoreManager
Review Comment:
Sufficient to say "close taskRestoreManager after 1 (and 2 if retried) are
complete"
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName,
Checkpoint>> restoreAllTaskInstan
restoreFuture = taskRestoreManager.restore();
}
- CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ // Order of the following async operations is critical. They are
chained as follows:
+ // 1. check if restore succeeded in the first try -> if it failed with
DeletedException, flag restore to be retried.
+ // 2. retry restore with getDeleted if previous step completes and
returns true.
+ // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to
ensure no inflight restore on taskRestoreManager
+ // fails with closed taskRestoreManager
+
+ CompletableFuture<Boolean> shouldRetryRestoreFuture =
+ // check if the restore should be retried. If restore returned a
DeletedException, this future return true
+ shouldRetryFailedRestore(restoreFuture, startTime,
samzaContainerMetrics, taskInstanceName);
+
+ CompletableFuture<Checkpoint> taskCheckpointFuture =
+ // Creates a future for either the new task checkpoint if the
restore was retried, or returns the old task
+ // checkpoint otherwise
+ retryFailedRestore(shouldRetryRestoreFuture, taskInstanceName,
taskCheckpoints, checkpointManager,
+ taskRestoreManager, config, taskInstanceMetrics, executor,
loggedStoreDir, jobContext, containerModel,
+ factoryName, taskBackendFactoryToStoreNames);
+
+ CompletableFuture<Void> cleanUpFuture =
+ // cleanup all the resources (like taskRestoreManager), update
restore time and update task checkpoint
+ // returned from the previous future in the chain
+ cleanUpResources(taskCheckpointFuture, startTime,
samzaContainerMetrics, taskInstanceName,
+ taskRestoreManager, newTaskCheckpoints);
+
+ restoreAndCleanupFutures.add(cleanUpFuture);
+ });
+ });
+
+ return CompletableFuture.allOf(restoreAndCleanupFutures.toArray(new
CompletableFuture[0]))
+ .thenApply(aVoid -> newTaskCheckpoints);
+ }
+
+ private static CompletableFuture<Boolean>
shouldRetryFailedRestore(CompletableFuture<Void> restoreFuture, long startTime,
+ SamzaContainerMetrics samzaContainerMetrics, TaskName taskInstanceName) {
+ return restoreFuture.handle((aVoid, ex) -> {
+ if (ex != null) {
+ if (isUnwrappedExceptionDeletedException(ex)) {
+ // if the exception is of type DeletedException, retry restore (with
getDeleted flag set to true).
+ return true;
+ } else {
updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("Error restoring state for task: %s",
taskInstanceName.getTaskName());
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex); // wrap in unchecked exception to
throw from lambda
+ }
+ }
+ return false;
+ });
+ }
- if (ex != null) {
- if (isUnwrappedExceptionDeletedException(ex)) {
- LOG.warn("Received DeletedException during restore for task {}.
Attempting to get blobs with getDeleted flag",
- taskInstanceName.getTaskName());
-
- // Try to restore with getDeleted flag
- CompletableFuture<Checkpoint> future =
- restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
- checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
-
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
- jobContext, containerModel);
- try {
- newTaskCheckpoints.put(taskInstanceName, future);
- } catch (Exception e) {
- String msg = String.format("DeletedException during restore
task: %s after retrying to get deleted blobs.", taskName);
- throw new SamzaException(msg, e);
- } finally {
- updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
- }
- } else {
- // log and rethrow exception to communicate restore failure
- String msg = String.format("Error restoring state for task: %s",
taskName);
- LOG.error(msg, ex);
- throw new SamzaException(msg, ex); // wrap in unchecked
exception to throw from lambda
- }
- }
+ private static CompletableFuture<Checkpoint>
retryFailedRestore(CompletableFuture<Boolean> shouldRetryRestoreFuture,
+ TaskName taskName, Map<TaskName, Checkpoint> taskCheckpoints,
CheckpointManager checkpointManager,
+ TaskRestoreManager taskRestoreManager, Config config, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ ExecutorService executor, File loggedStoreBaseDirectory, JobContext
jobContext, ContainerModel containerModel,
+ String factoryName, Map<TaskName, Map<String, Set<String>>>
taskBackendFactoryToStoreNames) {
+ return shouldRetryRestoreFuture.thenCompose(shouldRetryRestore -> {
Review Comment:
Blocker: Extract the contents of the lambdas to methods, and keep the future
chaining in the caller.
Makes it easier to understand the execution plan.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName,
Checkpoint>> restoreAllTaskInstan
restoreFuture = taskRestoreManager.restore();
}
- CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ // Order of the following async operations is critical. They are
chained as follows:
+ // 1. check if restore succeeded in the first try -> if it failed with
DeletedException, flag restore to be retried.
+ // 2. retry restore with getDeleted if previous step completes and
returns true.
+ // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to
ensure no inflight restore on taskRestoreManager
+ // fails with closed taskRestoreManager
+
+ CompletableFuture<Boolean> shouldRetryRestoreFuture =
+ // check if the restore should be retried. If restore returned a
DeletedException, this future return true
+ shouldRetryFailedRestore(restoreFuture, startTime,
samzaContainerMetrics, taskInstanceName);
+
+ CompletableFuture<Checkpoint> taskCheckpointFuture =
Review Comment:
restoreRetryFuture. Return type is already obvious from the
CompletableFuture<Checkpoint>
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName,
Checkpoint>> restoreAllTaskInstan
restoreFuture = taskRestoreManager.restore();
}
- CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ // Order of the following async operations is critical. They are
chained as follows:
+ // 1. check if restore succeeded in the first try -> if it failed with
DeletedException, flag restore to be retried.
+ // 2. retry restore with getDeleted if previous step completes and
returns true.
+ // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to
ensure no inflight restore on taskRestoreManager
+ // fails with closed taskRestoreManager
+
+ CompletableFuture<Boolean> shouldRetryRestoreFuture =
+ // check if the restore should be retried. If restore returned a
DeletedException, this future return true
+ shouldRetryFailedRestore(restoreFuture, startTime,
samzaContainerMetrics, taskInstanceName);
+
+ CompletableFuture<Checkpoint> taskCheckpointFuture =
+ // Creates a future for either the new task checkpoint if the
restore was retried, or returns the old task
+ // checkpoint otherwise
+ retryFailedRestore(shouldRetryRestoreFuture, taskInstanceName,
taskCheckpoints, checkpointManager,
+ taskRestoreManager, config, taskInstanceMetrics, executor,
loggedStoreDir, jobContext, containerModel,
+ factoryName, taskBackendFactoryToStoreNames);
+
+ CompletableFuture<Void> cleanUpFuture =
+ // cleanup all the resources (like taskRestoreManager), update
restore time and update task checkpoint
+ // returned from the previous future in the chain
+ cleanUpResources(taskCheckpointFuture, startTime,
samzaContainerMetrics, taskInstanceName,
+ taskRestoreManager, newTaskCheckpoints);
+
+ restoreAndCleanupFutures.add(cleanUpFuture);
+ });
+ });
+
+ return CompletableFuture.allOf(restoreAndCleanupFutures.toArray(new
CompletableFuture[0]))
+ .thenApply(aVoid -> newTaskCheckpoints);
+ }
+
+ private static CompletableFuture<Boolean>
shouldRetryFailedRestore(CompletableFuture<Void> restoreFuture, long startTime,
+ SamzaContainerMetrics samzaContainerMetrics, TaskName taskInstanceName) {
+ return restoreFuture.handle((aVoid, ex) -> {
+ if (ex != null) {
+ if (isUnwrappedExceptionDeletedException(ex)) {
+ // if the exception is of type DeletedException, retry restore (with
getDeleted flag set to true).
+ return true;
+ } else {
updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("Error restoring state for task: %s",
taskInstanceName.getTaskName());
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex); // wrap in unchecked exception to
throw from lambda
+ }
+ }
+ return false;
+ });
+ }
- if (ex != null) {
- if (isUnwrappedExceptionDeletedException(ex)) {
- LOG.warn("Received DeletedException during restore for task {}.
Attempting to get blobs with getDeleted flag",
- taskInstanceName.getTaskName());
-
- // Try to restore with getDeleted flag
- CompletableFuture<Checkpoint> future =
- restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
- checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
-
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
- jobContext, containerModel);
- try {
- newTaskCheckpoints.put(taskInstanceName, future);
- } catch (Exception e) {
- String msg = String.format("DeletedException during restore
task: %s after retrying to get deleted blobs.", taskName);
- throw new SamzaException(msg, e);
- } finally {
- updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
- }
- } else {
- // log and rethrow exception to communicate restore failure
- String msg = String.format("Error restoring state for task: %s",
taskName);
- LOG.error(msg, ex);
- throw new SamzaException(msg, ex); // wrap in unchecked
exception to throw from lambda
- }
- }
+ private static CompletableFuture<Checkpoint>
retryFailedRestore(CompletableFuture<Boolean> shouldRetryRestoreFuture,
+ TaskName taskName, Map<TaskName, Checkpoint> taskCheckpoints,
CheckpointManager checkpointManager,
+ TaskRestoreManager taskRestoreManager, Config config, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ ExecutorService executor, File loggedStoreBaseDirectory, JobContext
jobContext, ContainerModel containerModel,
+ String factoryName, Map<TaskName, Map<String, Set<String>>>
taskBackendFactoryToStoreNames) {
+ return shouldRetryRestoreFuture.thenCompose(shouldRetryRestore -> {
+ if (shouldRetryRestore) {
+ // Try to restore with getDeleted flag - return a new task checkpoint
+ return restoreDeletedSnapshot(taskName, taskCheckpoints,
checkpointManager, taskRestoreManager,
+ config, taskInstanceMetrics, executor,
+ taskBackendFactoryToStoreNames.get(taskName).get(factoryName),
loggedStoreBaseDirectory, jobContext,
+ containerModel);
+ }
+ // if shouldRetryRestore is false, do not retry restore. This means
restore completed successfully in the first try
+ // Return the old task checkpoint as no new checkpoint was created
+ return CompletableFuture.completedFuture(taskCheckpoints.get(taskName));
+ });
+ }
- // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
- // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
- try {
- taskRestoreManager.close();
- } catch (Exception e) {
- LOG.error("Error closing restore manager for task: {} after {}
restore", taskName,
- ex != null ? "unsuccessful" : "successful", e);
- // ignore exception from close. container may still be able to
continue processing/backups
- // if restore manager close fails.
- }
- return null;
- });
- taskRestoreFutures.add(taskRestoreFuture);
- });
+ private static CompletableFuture<Void>
cleanUpResources(CompletableFuture<Checkpoint> taskCheckpointFuture,
+ long startTime, SamzaContainerMetrics samzaContainerMetrics, TaskName
taskName, TaskRestoreManager taskRestoreManager,
+ Map<TaskName, Checkpoint> newTaskCheckpoints) {
+ return taskCheckpointFuture.handle((checkpoint, throwable) -> {
+ // exception or not, update the restore time and close
TaskRestoreManager as there will be no more retries.
+ updateRestoreTime(startTime, samzaContainerMetrics, taskName);
+ closeTaskRestoreManager(taskRestoreManager, taskName.getTaskName());
+ if (throwable != null) {
+ throw new SamzaException("Exception from attempt to restore deleted
snapshot", throwable);
+ }
+ if (checkpoint != null) {
+ newTaskCheckpoints.put(taskName, checkpoint);
Review Comment:
Minor: Consider completing the future with the checkpoint and merging at the
end, instead of mutating passed in maps. Not a blocker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]