shekhars-li commented on code in PR #1682:
URL: https://github.com/apache/samza/pull/1682#discussion_r1297851046


##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -170,31 +170,23 @@ private static CompletableFuture<Map<TaskName, 
Checkpoint>> restoreAllTaskInstan
                       checkpointManager, taskRestoreManager, config, 
taskInstanceMetrics, executor,
                       
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), 
loggedStoreDir,
                       jobContext, containerModel);
-              try {
-                newTaskCheckpoints.put(taskInstanceName, future);
-              } catch (Exception e) {
-                String msg = String.format("DeletedException during restore 
task: %s after retrying to get deleted blobs.", taskName);
-                throw new SamzaException(msg, e);
-              } finally {
+              future.whenComplete((r, e) -> {
                 updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
-              }
+                closeTaskRestoreManager(taskRestoreManager, taskName);
+              });
+              newTaskCheckpoints.put(taskInstanceName, future);
             } else {
               // log and rethrow exception to communicate restore failure
               String msg = String.format("Error restoring state for task: %s", 
taskName);
               LOG.error(msg, ex);
               throw new SamzaException(msg, ex); // wrap in unchecked 
exception to throw from lambda
             }
-          }
-
-          // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
-          // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
-          try {
-            taskRestoreManager.close();
-          } catch (Exception e) {
-            LOG.error("Error closing restore manager for task: {} after {} 
restore", taskName,
-                ex != null ? "unsuccessful" : "successful", e);
-            // ignore exception from close. container may still be able to 
continue processing/backups
-            // if restore manager close fails.
+          } else {
+            // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
+            // on stop, so paralleling stop() also parallelizes their 
compaction (a time-intensive operation).
+            // NOTE: closing the taskRestoreManager outside this else block 
may cause taskRestoreManager to be closed
+            // before async restoreDeletedSnapshot() is complete.
+            closeTaskRestoreManager(taskRestoreManager, taskName);

Review Comment:
   Updated that. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to