shekhars-li commented on code in PR #1676: URL: https://github.com/apache/samza/pull/1676#discussion_r1289227259
########## samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java: ########## @@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames( } return sideInputStores; } + + /** + * Inits and Restores all the task stores. + * Note: In case of {@link BlobStoreRestoreManager}, this method retries init and restore with getDeleted flag if it + * receives a {@link DeletedException}. This will create a new checkpoint for the corresponding task. + */ + public static CompletableFuture<Map<TaskName, Checkpoint>> initAndRestoreTaskInstances( + Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, SamzaContainerMetrics samzaContainerMetrics, + CheckpointManager checkpointManager, JobContext jobContext, ContainerModel containerModel, + Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames, + Config config, ExecutorService executor, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, + File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) { + + Set<String> forceRestoreTasks = new HashSet<>(); + // Initialize each TaskStorageManager. + taskRestoreManagers.forEach((taskName, restoreManagers) -> + restoreManagers.forEach((factoryName, taskRestoreManager) -> { + try { + taskRestoreManager.init(taskCheckpoints.get(taskName)); + } catch (SamzaException ex) { + if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager instanceof BlobStoreRestoreManager) { + // Get deleted SnapshotIndex blob with GetDeleted and mark the task to be restored with GetDeleted as well. + // this ensures that the restore downloads the snapshot, recreates a new snapshot, uploads it to blob store + // and creates a new checkpoint. + ((BlobStoreRestoreManager) taskRestoreManager).init(taskCheckpoints.get(taskName), true); + forceRestoreTasks.add(taskName.getTaskName()); + } else { + // log and rethrow exception to communicate restore failure + String msg = String.format("init failed for BlobStoreRestoreManager, task: %s with GetDeleted set to true", taskName); + LOG.error(msg, ex); + throw new SamzaException(msg, ex); + } + } + }) + ); + + // Start each store consumer once. + // Note: These consumers are per system and only changelog system store consumers will be started. + // Some TaskRestoreManagers may not require the consumer to be started, but due to the agnostic nature of + // ContainerStorageManager we always start the changelog consumer here in case it is required + storeConsumers.values().stream().distinct().forEach(SystemConsumer::start); + + return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints, taskBackendFactoryToStoreNames, jobContext, + containerModel, samzaContainerMetrics, checkpointManager, config, taskInstanceMetrics, executor, loggerStoreDir, + forceRestoreTasks); + } + + /** + * Restores all TaskInstances and returns a future for each TaskInstance restore. Note: In case of + * {@link BlobStoreRestoreManager}, this method restore with getDeleted flag if it receives a + * {@link DeletedException}. This will create a new Checkpoint. + */ + private static CompletableFuture<Map<TaskName, Checkpoint>> restoreAllTaskInstances( + Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, Map<TaskName, Checkpoint> taskCheckpoints, + Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames, JobContext jobContext, + ContainerModel containerModel, SamzaContainerMetrics samzaContainerMetrics, CheckpointManager checkpointManager, + Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, ExecutorService executor, + File loggedStoreDir, Set<String> forceRestoreTask) { + + Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new ConcurrentHashMap<>(); + List<Future<Void>> taskRestoreFutures = new ArrayList<>(); + + // Submit restore callable for each taskInstance + taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> { + // Submit for each restore factory + restoreManagersMap.forEach((factoryName, taskRestoreManager) -> { + long startTime = System.currentTimeMillis(); + String taskName = taskInstanceName.getTaskName(); + LOG.info("Starting restore for state for task: {}", taskName); + + CompletableFuture<Void> restoreFuture; + if (forceRestoreTask.contains(taskName) && taskRestoreManager instanceof BlobStoreRestoreManager) { + // If init was retried with getDeleted, force restore with getDeleted as well, since init only inits the + // restoreManager with deleted SnapshotIndex but does not retry to recover the deleted blobs and delegates it + // to restore(). + // Create an empty future that fails immediately with DeletedException to force retry in restore. + restoreFuture = new CompletableFuture<>(); + restoreFuture.completeExceptionally(new SamzaException(new DeletedException())); + } else { + restoreFuture = taskRestoreManager.restore(); + } + + CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, ex) -> { + updateRestoreTime(startTime, samzaContainerMetrics, taskInstanceName); + + if (ex != null) { + if (isUnwrappedExceptionDeletedException(ex)) { + LOG.warn( + "Received DeletedException during restore for task {}. Reattempting to get deleted blobs", Review Comment: Updated all the occurrence in logs/params etc to getDeleted. Also added it to more logs for better traceability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org