shekhars-li commented on code in PR #1676: URL: https://github.com/apache/samza/pull/1676#discussion_r1289157811
########## samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java: ########## @@ -301,79 +303,28 @@ private void restoreStores() throws InterruptedException { samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock); taskRestoreManagers.put(taskName, taskStoreRestoreManagers); + taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames); }); - // Initialize each TaskStorageManager - taskRestoreManagers.forEach((taskName, restoreManagers) -> - restoreManagers.forEach((factoryName, taskRestoreManager) -> - taskRestoreManager.init(taskCheckpoints.get(taskName)) - ) - ); - - // Start each store consumer once. - // Note: These consumers are per system and only changelog system store consumers will be started. - // Some TaskRestoreManagers may not require the consumer to to be started, but due to the agnostic nature of - // ContainerStorageManager we always start the changelog consumer here in case it is required - this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start); - - List<Future<Void>> taskRestoreFutures = new ArrayList<>(); - - // Submit restore callable for each taskInstance - taskRestoreManagers.forEach((taskInstance, restoreManagersMap) -> { - // Submit for each restore factory - restoreManagersMap.forEach((factoryName, taskRestoreManager) -> { - long startTime = System.currentTimeMillis(); - String taskName = taskInstance.getTaskName(); - LOG.info("Starting restore for state for task: {}", taskName); - CompletableFuture<Void> restoreFuture = taskRestoreManager.restore().handle((res, ex) -> { - // Stop all persistent stores after restoring. Certain persistent stores opened in BulkLoad mode are compacted - // on stop, so paralleling stop() also parallelizes their compaction (a time-intensive operation). - try { - taskRestoreManager.close(); - } catch (Exception e) { - LOG.error("Error closing restore manager for task: {} after {} restore", - taskName, ex != null ? "unsuccessful" : "successful", e); - // ignore exception from close. container may still be be able to continue processing/backups - // if restore manager close fails. - } - - long timeToRestore = System.currentTimeMillis() - startTime; - if (samzaContainerMetrics != null) { - Gauge taskGauge = samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance, null); - - if (taskGauge != null) { - taskGauge.set(timeToRestore); - } - } - - if (ex != null) { - // log and rethrow exception to communicate restore failure - String msg = String.format("Error restoring state for task: %s", taskName); - LOG.error(msg, ex); - throw new SamzaException(msg, ex); // wrap in unchecked exception to throw from lambda - } else { - return null; - } - }); - - taskRestoreFutures.add(restoreFuture); - }); - }); + // Init all taskRestores and if successful, restores all the task stores concurrently + CompletableFuture<Map<TaskName, Checkpoint>> initRestoreAndNewCheckpointFuture = + ContainerStorageManagerUtil.initAndRestoreTaskInstances(taskRestoreManagers, samzaContainerMetrics, + checkpointManager, jobContext, containerModel, taskCheckpoints, taskBackendFactoryToStoreNames, config, + restoreExecutor, taskInstanceMetrics, loggedStoreBaseDirectory, storeConsumers); - // Loop-over the future list to wait for each restore to finish, catch any exceptions during restore and throw - // as samza exceptions - for (Future<Void> future : taskRestoreFutures) { - try { - future.get(); - } catch (InterruptedException e) { - LOG.warn("Received an interrupt during store restoration. Interrupting the restore executor to exit " - + "prematurely without restoring full state."); - restoreExecutor.shutdownNow(); - throw e; - } catch (Exception e) { - LOG.error("Exception when restoring state.", e); - throw new SamzaException("Exception when restoring state.", e); - } + // Update the task checkpoints map, if it was updated during the restore. Throw an exception if the restore or + // creating a new checkpoint (in case of BlobStoreBackendFactory) failed. + try { + Map<TaskName, Checkpoint> newTaskCheckpoints = initRestoreAndNewCheckpointFuture.get(); + taskCheckpoints.putAll(newTaskCheckpoints); Review Comment: The returned map only has taskName -> checkpoint if a new checkpoint was created for a task. Some task restores may complete without any errors. That's why I am adding them to the map, not replacing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org