shekhars-li commented on code in PR #1682: URL: https://github.com/apache/samza/pull/1682#discussion_r1297851046
########## samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java: ########## @@ -170,31 +170,23 @@ private static CompletableFuture<Map<TaskName, Checkpoint>> restoreAllTaskInstan checkpointManager, taskRestoreManager, config, taskInstanceMetrics, executor, taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), loggedStoreDir, jobContext, containerModel); - try { - newTaskCheckpoints.put(taskInstanceName, future); - } catch (Exception e) { - String msg = String.format("DeletedException during restore task: %s after retrying to get deleted blobs.", taskName); - throw new SamzaException(msg, e); - } finally { + future.whenComplete((r, e) -> { updateRestoreTime(startTime, samzaContainerMetrics, taskInstanceName); - } + closeTaskRestoreManager(taskRestoreManager, taskName); + }); + newTaskCheckpoints.put(taskInstanceName, future); } else { // log and rethrow exception to communicate restore failure String msg = String.format("Error restoring state for task: %s", taskName); LOG.error(msg, ex); throw new SamzaException(msg, ex); // wrap in unchecked exception to throw from lambda } - } - - // Stop all persistent stores after restoring. Certain persistent stores opened in BulkLoad mode are compacted - // on stop, so paralleling stop() also parallelizes their compaction (a time-intensive operation). - try { - taskRestoreManager.close(); - } catch (Exception e) { - LOG.error("Error closing restore manager for task: {} after {} restore", taskName, - ex != null ? "unsuccessful" : "successful", e); - // ignore exception from close. container may still be able to continue processing/backups - // if restore manager close fails. + } else { + // Stop all persistent stores after restoring. Certain persistent stores opened in BulkLoad mode are compacted + // on stop, so paralleling stop() also parallelizes their compaction (a time-intensive operation). + // NOTE: closing the taskRestoreManager outside this else block may cause taskRestoreManager to be closed + // before async restoreDeletedSnapshot() is complete. + closeTaskRestoreManager(taskRestoreManager, taskName); Review Comment: Updated that. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org