shekhars-li commented on code in PR #1682:
URL: https://github.com/apache/samza/pull/1682#discussion_r1297850131
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -170,31 +170,23 @@ private static CompletableFuture<Map<TaskName,
Checkpoint>> restoreAllTaskInstan
checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
jobContext, containerModel);
- try {
- newTaskCheckpoints.put(taskInstanceName, future);
- } catch (Exception e) {
- String msg = String.format("DeletedException during restore
task: %s after retrying to get deleted blobs.", taskName);
- throw new SamzaException(msg, e);
- } finally {
+ future.whenComplete((r, e) -> {
Review Comment:
@prateekm that wouldn't work because returned restoreFutures only represent
restores that were successfully completed,. In case of exception, new
checkpoint is created and updated to a map (which is what we eventually
return). So we want to make sure taskRestoreManager is closed after
restoreFutures and newCheckpointFutures are completed.
We can't return a map for restoreFutures because taskCheckpoints can be null
and the concurrent map for newTaskCheckpoints can't accept that. (Also the
reason why we putAll after get() on the returned map to update old map rather
than replacing it)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]