prateekm commented on code in PR #1682:
URL: https://github.com/apache/samza/pull/1682#discussion_r1298650351


##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName, 
Checkpoint>> restoreAllTaskInstan
           restoreFuture = taskRestoreManager.restore();
         }
 
-        CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, 
ex) -> {
+        // Order of the following async operations is critical. They are 
chained as follows:
+        // 1. check if restore succeeded in the first try -> if it failed with 
DeletedException, flag restore to be retried.
+        // 2. retry restore with getDeleted if previous step completes and 
returns true.
+        // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to 
ensure no inflight restore on taskRestoreManager
+        //    fails with closed taskRestoreManager
+
+        CompletableFuture<Boolean> shouldRetryRestoreFuture =
+            // check if the restore should be retried. If restore returned a 
DeletedException, this future return true

Review Comment:
   Move comments to method javadoc for all.



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName, 
Checkpoint>> restoreAllTaskInstan
           restoreFuture = taskRestoreManager.restore();
         }
 
-        CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, 
ex) -> {
+        // Order of the following async operations is critical. They are 
chained as follows:
+        // 1. check if restore succeeded in the first try -> if it failed with 
DeletedException, flag restore to be retried.
+        // 2. retry restore with getDeleted if previous step completes and 
returns true.
+        // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to 
ensure no inflight restore on taskRestoreManager
+        //    fails with closed taskRestoreManager
+
+        CompletableFuture<Boolean> shouldRetryRestoreFuture =
+            // check if the restore should be retried. If restore returned a 
DeletedException, this future return true
+            shouldRetryFailedRestore(restoreFuture, startTime, 
samzaContainerMetrics, taskInstanceName);
+
+        CompletableFuture<Checkpoint> taskCheckpointFuture =
+            // Creates a future for either the new task checkpoint if the 
restore was retried, or returns the old task
+            // checkpoint otherwise
+            retryFailedRestore(shouldRetryRestoreFuture, taskInstanceName, 
taskCheckpoints, checkpointManager,
+                taskRestoreManager, config, taskInstanceMetrics, executor, 
loggedStoreDir, jobContext, containerModel,
+                factoryName, taskBackendFactoryToStoreNames);
+
+        CompletableFuture<Void> cleanUpFuture =
+            // cleanup all the resources (like taskRestoreManager), update 
restore time and update task checkpoint
+            // returned from the previous future in the chain
+            cleanUpResources(taskCheckpointFuture, startTime, 
samzaContainerMetrics, taskInstanceName,
+                taskRestoreManager, newTaskCheckpoints);
+
+        restoreAndCleanupFutures.add(cleanUpFuture);
+      });
+    });
+
+    return CompletableFuture.allOf(restoreAndCleanupFutures.toArray(new 
CompletableFuture[0]))
+        .thenApply(aVoid -> newTaskCheckpoints);
+  }
+
+  private static CompletableFuture<Boolean> 
shouldRetryFailedRestore(CompletableFuture<Void> restoreFuture, long startTime,
+      SamzaContainerMetrics samzaContainerMetrics, TaskName taskInstanceName) {
+    return restoreFuture.handle((aVoid, ex) -> {
+      if (ex != null) {
+        if (isUnwrappedExceptionDeletedException(ex)) {
+          // if the exception is of type DeletedException, retry restore (with 
getDeleted flag set to true).
+          return true;
+        } else {
           updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
+          // log and rethrow exception to communicate restore failure
+          String msg = String.format("Error restoring state for task: %s", 
taskInstanceName.getTaskName());
+          LOG.error(msg, ex);
+          throw new SamzaException(msg, ex); // wrap in unchecked exception to 
throw from lambda
+        }
+      }
+      return false;
+    });
+  }
 
-          if (ex != null) {
-            if (isUnwrappedExceptionDeletedException(ex)) {
-              LOG.warn("Received DeletedException during restore for task {}. 
Attempting to get blobs with getDeleted flag",
-                  taskInstanceName.getTaskName());
-
-              // Try to restore with getDeleted flag
-              CompletableFuture<Checkpoint> future =
-                  restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
-                      checkpointManager, taskRestoreManager, config, 
taskInstanceMetrics, executor,
-                      
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), 
loggedStoreDir,
-                      jobContext, containerModel);
-              try {
-                newTaskCheckpoints.put(taskInstanceName, future);
-              } catch (Exception e) {
-                String msg = String.format("DeletedException during restore 
task: %s after retrying to get deleted blobs.", taskName);
-                throw new SamzaException(msg, e);
-              } finally {
-                updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
-              }
-            } else {
-              // log and rethrow exception to communicate restore failure
-              String msg = String.format("Error restoring state for task: %s", 
taskName);
-              LOG.error(msg, ex);
-              throw new SamzaException(msg, ex); // wrap in unchecked 
exception to throw from lambda
-            }
-          }
+  private static CompletableFuture<Checkpoint> 
retryFailedRestore(CompletableFuture<Boolean> shouldRetryRestoreFuture,
+      TaskName taskName, Map<TaskName, Checkpoint> taskCheckpoints, 
CheckpointManager checkpointManager,
+      TaskRestoreManager taskRestoreManager, Config config, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      ExecutorService executor, File loggedStoreBaseDirectory, JobContext 
jobContext, ContainerModel containerModel,
+      String factoryName, Map<TaskName, Map<String, Set<String>>> 
taskBackendFactoryToStoreNames) {
+    return shouldRetryRestoreFuture.thenCompose(shouldRetryRestore -> {
+      if (shouldRetryRestore) {
+        // Try to restore with getDeleted flag - return a new task checkpoint
+        return restoreDeletedSnapshot(taskName, taskCheckpoints, 
checkpointManager, taskRestoreManager,
+            config, taskInstanceMetrics, executor,
+            taskBackendFactoryToStoreNames.get(taskName).get(factoryName), 
loggedStoreBaseDirectory, jobContext,
+            containerModel);
+      }
+      // if shouldRetryRestore is false, do not retry restore. This means 
restore completed successfully in the first try
+      // Return the old task checkpoint as no new checkpoint was created
+      return CompletableFuture.completedFuture(taskCheckpoints.get(taskName));
+    });
+  }
 
-          // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
-          // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
-          try {
-            taskRestoreManager.close();
-          } catch (Exception e) {
-            LOG.error("Error closing restore manager for task: {} after {} 
restore", taskName,
-                ex != null ? "unsuccessful" : "successful", e);
-            // ignore exception from close. container may still be able to 
continue processing/backups
-            // if restore manager close fails.
-          }
-          return null;
-        });
-        taskRestoreFutures.add(taskRestoreFuture);
-      });
+  private static CompletableFuture<Void> 
cleanUpResources(CompletableFuture<Checkpoint> taskCheckpointFuture,
+      long startTime, SamzaContainerMetrics samzaContainerMetrics, TaskName 
taskName, TaskRestoreManager taskRestoreManager,
+      Map<TaskName, Checkpoint> newTaskCheckpoints) {
+    return taskCheckpointFuture.handle((checkpoint, throwable) -> {
+      // exception or not, update the restore time and close 
TaskRestoreManager as there will be no more retries.
+      updateRestoreTime(startTime, samzaContainerMetrics, taskName);
+      closeTaskRestoreManager(taskRestoreManager, taskName.getTaskName());
+      if (throwable != null) {
+        throw new SamzaException("Exception from attempt to restore deleted 
snapshot", throwable);

Review Comment:
   "Error restoring  state for task: {taskName}"



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -358,4 +403,20 @@ private static Boolean 
isUnwrappedExceptionDeletedException(Throwable ex) {
         FutureUtil.unwrapExceptions(SamzaException.class, ex));
     return unwrappedException instanceof DeletedException;
   }
+
+  /**
+   * Stop persistent stores.
+   * NOTE: Call this method concurrently for all {@link TaskRestoreManager}s 
so that stop() can be parallelized.
+   * Certain persistent stores opened in BulkLoad mode are compacted on stop, 
so paralleling stop()
+   * also parallelizes their compaction (a time-intensive operation).
+   */
+  private static void closeTaskRestoreManager(TaskRestoreManager 
taskRestoreManager, String taskName) {

Review Comment:
   Can inline this method.



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -133,8 +135,8 @@ private static CompletableFuture<Map<TaskName, Checkpoint>> 
restoreAllTaskInstan
       Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, 
ExecutorService executor,
       File loggedStoreDir, Set<String> forceRestoreTask) {
 
-    Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new 
ConcurrentHashMap<>();
-    List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+    Map<TaskName, Checkpoint> newTaskCheckpoints = new ConcurrentHashMap<>();
+    Queue<Future<Void>> restoreAndCleanupFutures = new 
ConcurrentLinkedQueue<>();

Review Comment:
   Why does this need to be a concurrent queue?



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName, 
Checkpoint>> restoreAllTaskInstan
           restoreFuture = taskRestoreManager.restore();
         }
 
-        CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, 
ex) -> {
+        // Order of the following async operations is critical. They are 
chained as follows:
+        // 1. check if restore succeeded in the first try -> if it failed with 
DeletedException, flag restore to be retried.
+        // 2. retry restore with getDeleted if previous step completes and 
returns true.
+        // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to 
ensure no inflight restore on taskRestoreManager

Review Comment:
   Sufficient to say "close taskRestoreManager after 1 (and 2 if retried) are 
complete"



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName, 
Checkpoint>> restoreAllTaskInstan
           restoreFuture = taskRestoreManager.restore();
         }
 
-        CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, 
ex) -> {
+        // Order of the following async operations is critical. They are 
chained as follows:
+        // 1. check if restore succeeded in the first try -> if it failed with 
DeletedException, flag restore to be retried.
+        // 2. retry restore with getDeleted if previous step completes and 
returns true.
+        // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to 
ensure no inflight restore on taskRestoreManager
+        //    fails with closed taskRestoreManager
+
+        CompletableFuture<Boolean> shouldRetryRestoreFuture =
+            // check if the restore should be retried. If restore returned a 
DeletedException, this future return true
+            shouldRetryFailedRestore(restoreFuture, startTime, 
samzaContainerMetrics, taskInstanceName);
+
+        CompletableFuture<Checkpoint> taskCheckpointFuture =
+            // Creates a future for either the new task checkpoint if the 
restore was retried, or returns the old task
+            // checkpoint otherwise
+            retryFailedRestore(shouldRetryRestoreFuture, taskInstanceName, 
taskCheckpoints, checkpointManager,
+                taskRestoreManager, config, taskInstanceMetrics, executor, 
loggedStoreDir, jobContext, containerModel,
+                factoryName, taskBackendFactoryToStoreNames);
+
+        CompletableFuture<Void> cleanUpFuture =
+            // cleanup all the resources (like taskRestoreManager), update 
restore time and update task checkpoint
+            // returned from the previous future in the chain
+            cleanUpResources(taskCheckpointFuture, startTime, 
samzaContainerMetrics, taskInstanceName,
+                taskRestoreManager, newTaskCheckpoints);
+
+        restoreAndCleanupFutures.add(cleanUpFuture);
+      });
+    });
+
+    return CompletableFuture.allOf(restoreAndCleanupFutures.toArray(new 
CompletableFuture[0]))
+        .thenApply(aVoid -> newTaskCheckpoints);
+  }
+
+  private static CompletableFuture<Boolean> 
shouldRetryFailedRestore(CompletableFuture<Void> restoreFuture, long startTime,
+      SamzaContainerMetrics samzaContainerMetrics, TaskName taskInstanceName) {
+    return restoreFuture.handle((aVoid, ex) -> {
+      if (ex != null) {
+        if (isUnwrappedExceptionDeletedException(ex)) {
+          // if the exception is of type DeletedException, retry restore (with 
getDeleted flag set to true).
+          return true;
+        } else {
           updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
+          // log and rethrow exception to communicate restore failure
+          String msg = String.format("Error restoring state for task: %s", 
taskInstanceName.getTaskName());
+          LOG.error(msg, ex);
+          throw new SamzaException(msg, ex); // wrap in unchecked exception to 
throw from lambda
+        }
+      }
+      return false;
+    });
+  }
 
-          if (ex != null) {
-            if (isUnwrappedExceptionDeletedException(ex)) {
-              LOG.warn("Received DeletedException during restore for task {}. 
Attempting to get blobs with getDeleted flag",
-                  taskInstanceName.getTaskName());
-
-              // Try to restore with getDeleted flag
-              CompletableFuture<Checkpoint> future =
-                  restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
-                      checkpointManager, taskRestoreManager, config, 
taskInstanceMetrics, executor,
-                      
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), 
loggedStoreDir,
-                      jobContext, containerModel);
-              try {
-                newTaskCheckpoints.put(taskInstanceName, future);
-              } catch (Exception e) {
-                String msg = String.format("DeletedException during restore 
task: %s after retrying to get deleted blobs.", taskName);
-                throw new SamzaException(msg, e);
-              } finally {
-                updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
-              }
-            } else {
-              // log and rethrow exception to communicate restore failure
-              String msg = String.format("Error restoring state for task: %s", 
taskName);
-              LOG.error(msg, ex);
-              throw new SamzaException(msg, ex); // wrap in unchecked 
exception to throw from lambda
-            }
-          }
+  private static CompletableFuture<Checkpoint> 
retryFailedRestore(CompletableFuture<Boolean> shouldRetryRestoreFuture,
+      TaskName taskName, Map<TaskName, Checkpoint> taskCheckpoints, 
CheckpointManager checkpointManager,
+      TaskRestoreManager taskRestoreManager, Config config, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      ExecutorService executor, File loggedStoreBaseDirectory, JobContext 
jobContext, ContainerModel containerModel,
+      String factoryName, Map<TaskName, Map<String, Set<String>>> 
taskBackendFactoryToStoreNames) {
+    return shouldRetryRestoreFuture.thenCompose(shouldRetryRestore -> {

Review Comment:
   Blocker: Extract the contents of the lambdas to methods, and keep the future 
chaining in the caller.
   Makes it easier to understand the execution plan.



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName, 
Checkpoint>> restoreAllTaskInstan
           restoreFuture = taskRestoreManager.restore();
         }
 
-        CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, 
ex) -> {
+        // Order of the following async operations is critical. They are 
chained as follows:
+        // 1. check if restore succeeded in the first try -> if it failed with 
DeletedException, flag restore to be retried.
+        // 2. retry restore with getDeleted if previous step completes and 
returns true.
+        // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to 
ensure no inflight restore on taskRestoreManager
+        //    fails with closed taskRestoreManager
+
+        CompletableFuture<Boolean> shouldRetryRestoreFuture =
+            // check if the restore should be retried. If restore returned a 
DeletedException, this future return true
+            shouldRetryFailedRestore(restoreFuture, startTime, 
samzaContainerMetrics, taskInstanceName);
+
+        CompletableFuture<Checkpoint> taskCheckpointFuture =

Review Comment:
   restoreRetryFuture. Return type is already obvious from the 
CompletableFuture<Checkpoint>



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -156,53 +158,90 @@ private static CompletableFuture<Map<TaskName, 
Checkpoint>> restoreAllTaskInstan
           restoreFuture = taskRestoreManager.restore();
         }
 
-        CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, 
ex) -> {
+        // Order of the following async operations is critical. They are 
chained as follows:
+        // 1. check if restore succeeded in the first try -> if it failed with 
DeletedException, flag restore to be retried.
+        // 2. retry restore with getDeleted if previous step completes and 
returns true.
+        // 3. Close taskRestoreManager ONLY after 1 and 2 completes - to 
ensure no inflight restore on taskRestoreManager
+        //    fails with closed taskRestoreManager
+
+        CompletableFuture<Boolean> shouldRetryRestoreFuture =
+            // check if the restore should be retried. If restore returned a 
DeletedException, this future return true
+            shouldRetryFailedRestore(restoreFuture, startTime, 
samzaContainerMetrics, taskInstanceName);
+
+        CompletableFuture<Checkpoint> taskCheckpointFuture =
+            // Creates a future for either the new task checkpoint if the 
restore was retried, or returns the old task
+            // checkpoint otherwise
+            retryFailedRestore(shouldRetryRestoreFuture, taskInstanceName, 
taskCheckpoints, checkpointManager,
+                taskRestoreManager, config, taskInstanceMetrics, executor, 
loggedStoreDir, jobContext, containerModel,
+                factoryName, taskBackendFactoryToStoreNames);
+
+        CompletableFuture<Void> cleanUpFuture =
+            // cleanup all the resources (like taskRestoreManager), update 
restore time and update task checkpoint
+            // returned from the previous future in the chain
+            cleanUpResources(taskCheckpointFuture, startTime, 
samzaContainerMetrics, taskInstanceName,
+                taskRestoreManager, newTaskCheckpoints);
+
+        restoreAndCleanupFutures.add(cleanUpFuture);
+      });
+    });
+
+    return CompletableFuture.allOf(restoreAndCleanupFutures.toArray(new 
CompletableFuture[0]))
+        .thenApply(aVoid -> newTaskCheckpoints);
+  }
+
+  private static CompletableFuture<Boolean> 
shouldRetryFailedRestore(CompletableFuture<Void> restoreFuture, long startTime,
+      SamzaContainerMetrics samzaContainerMetrics, TaskName taskInstanceName) {
+    return restoreFuture.handle((aVoid, ex) -> {
+      if (ex != null) {
+        if (isUnwrappedExceptionDeletedException(ex)) {
+          // if the exception is of type DeletedException, retry restore (with 
getDeleted flag set to true).
+          return true;
+        } else {
           updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
+          // log and rethrow exception to communicate restore failure
+          String msg = String.format("Error restoring state for task: %s", 
taskInstanceName.getTaskName());
+          LOG.error(msg, ex);
+          throw new SamzaException(msg, ex); // wrap in unchecked exception to 
throw from lambda
+        }
+      }
+      return false;
+    });
+  }
 
-          if (ex != null) {
-            if (isUnwrappedExceptionDeletedException(ex)) {
-              LOG.warn("Received DeletedException during restore for task {}. 
Attempting to get blobs with getDeleted flag",
-                  taskInstanceName.getTaskName());
-
-              // Try to restore with getDeleted flag
-              CompletableFuture<Checkpoint> future =
-                  restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
-                      checkpointManager, taskRestoreManager, config, 
taskInstanceMetrics, executor,
-                      
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), 
loggedStoreDir,
-                      jobContext, containerModel);
-              try {
-                newTaskCheckpoints.put(taskInstanceName, future);
-              } catch (Exception e) {
-                String msg = String.format("DeletedException during restore 
task: %s after retrying to get deleted blobs.", taskName);
-                throw new SamzaException(msg, e);
-              } finally {
-                updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
-              }
-            } else {
-              // log and rethrow exception to communicate restore failure
-              String msg = String.format("Error restoring state for task: %s", 
taskName);
-              LOG.error(msg, ex);
-              throw new SamzaException(msg, ex); // wrap in unchecked 
exception to throw from lambda
-            }
-          }
+  private static CompletableFuture<Checkpoint> 
retryFailedRestore(CompletableFuture<Boolean> shouldRetryRestoreFuture,
+      TaskName taskName, Map<TaskName, Checkpoint> taskCheckpoints, 
CheckpointManager checkpointManager,
+      TaskRestoreManager taskRestoreManager, Config config, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      ExecutorService executor, File loggedStoreBaseDirectory, JobContext 
jobContext, ContainerModel containerModel,
+      String factoryName, Map<TaskName, Map<String, Set<String>>> 
taskBackendFactoryToStoreNames) {
+    return shouldRetryRestoreFuture.thenCompose(shouldRetryRestore -> {
+      if (shouldRetryRestore) {
+        // Try to restore with getDeleted flag - return a new task checkpoint
+        return restoreDeletedSnapshot(taskName, taskCheckpoints, 
checkpointManager, taskRestoreManager,
+            config, taskInstanceMetrics, executor,
+            taskBackendFactoryToStoreNames.get(taskName).get(factoryName), 
loggedStoreBaseDirectory, jobContext,
+            containerModel);
+      }
+      // if shouldRetryRestore is false, do not retry restore. This means 
restore completed successfully in the first try
+      // Return the old task checkpoint as no new checkpoint was created
+      return CompletableFuture.completedFuture(taskCheckpoints.get(taskName));
+    });
+  }
 
-          // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
-          // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
-          try {
-            taskRestoreManager.close();
-          } catch (Exception e) {
-            LOG.error("Error closing restore manager for task: {} after {} 
restore", taskName,
-                ex != null ? "unsuccessful" : "successful", e);
-            // ignore exception from close. container may still be able to 
continue processing/backups
-            // if restore manager close fails.
-          }
-          return null;
-        });
-        taskRestoreFutures.add(taskRestoreFuture);
-      });
+  private static CompletableFuture<Void> 
cleanUpResources(CompletableFuture<Checkpoint> taskCheckpointFuture,
+      long startTime, SamzaContainerMetrics samzaContainerMetrics, TaskName 
taskName, TaskRestoreManager taskRestoreManager,
+      Map<TaskName, Checkpoint> newTaskCheckpoints) {
+    return taskCheckpointFuture.handle((checkpoint, throwable) -> {
+      // exception or not, update the restore time and close 
TaskRestoreManager as there will be no more retries.
+      updateRestoreTime(startTime, samzaContainerMetrics, taskName);
+      closeTaskRestoreManager(taskRestoreManager, taskName.getTaskName());
+      if (throwable != null) {
+        throw new SamzaException("Exception from attempt to restore deleted 
snapshot", throwable);
+      }
+      if (checkpoint != null) {
+        newTaskCheckpoints.put(taskName, checkpoint);

Review Comment:
   Minor: Consider completing the future with the checkpoint and merging at the 
end, instead of mutating passed in maps. Not a blocker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to