shekhars-li commented on code in PR #1676: URL: https://github.com/apache/samza/pull/1676#discussion_r1286282038
########## samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java: ########## @@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames( } return sideInputStores; } + + public static List<Future<Void>> initAndRestoreTaskInstances( + Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, SamzaContainerMetrics samzaContainerMetrics, + CheckpointManager checkpointManager, JobContext jobContext, ContainerModel containerModel, + Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames, + Config config, ExecutorService executor, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, + File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) { + + Set<String> forceRestoreTasks = new HashSet<>(); + // Initialize each TaskStorageManager. + taskRestoreManagers.forEach((taskName, restoreManagers) -> + restoreManagers.forEach((factoryName, taskRestoreManager) -> { + try { + taskRestoreManager.init(taskCheckpoints.get(taskName)); + } catch (SamzaException ex) { + if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager instanceof BlobStoreRestoreManager) { + // Get deleted SnapshotIndex blob with GetDeleted and mark the task to be restored with GetDeleted as well. + // this ensures that the restore downloads the snapshot, recreates a new snapshot, uploads it to blob store + // and creates a new checkpoint. + ((BlobStoreRestoreManager) taskRestoreManager).init(taskCheckpoints.get(taskName), true); + forceRestoreTasks.add(taskName.getTaskName()); + } else { + // log and rethrow exception to communicate restore failure + String msg = String.format("init failed for task: %s with GetDeleted set to true", taskName); + LOG.error(msg, ex); + throw new SamzaException(msg, ex); + } + } + }) + ); + + // Start each store consumer once. + // Note: These consumers are per system and only changelog system store consumers will be started. + // Some TaskRestoreManagers may not require the consumer to be started, but due to the agnostic nature of + // ContainerStorageManager we always start the changelog consumer here in case it is required + storeConsumers.values().stream().distinct().forEach(SystemConsumer::start); + + return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints, taskBackendFactoryToStoreNames, jobContext, + containerModel, samzaContainerMetrics, checkpointManager, config, taskInstanceMetrics, executor, loggerStoreDir, + forceRestoreTasks); + } + + /** + * Restores all TaskInstances and returns a future for each TaskInstance restore. If this restore fails with DeletedException + * it will retry the restore with getDeleted flag, get all the blobs, recreate a new checkpoint in blob store and + * write that checkpoint to checkpoint topic. + * @param taskRestoreManagers map of TaskName to factory name to TaskRestoreManager map. + */ + public static List<Future<Void>> restoreAllTaskInstances( + Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, Map<TaskName, Checkpoint> taskCheckpoints, + Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames, JobContext jobContext, + ContainerModel containerModel, SamzaContainerMetrics samzaContainerMetrics, CheckpointManager checkpointManager, + Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, ExecutorService executor, + File loggedStoreDir, Set<String> forceRestoreTask) { + + List<Future<Void>> taskRestoreFutures = new ArrayList<>(); + + // Submit restore callable for each taskInstance + taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> { + // Submit for each restore factory + restoreManagersMap.forEach((factoryName, taskRestoreManager) -> { + long startTime = System.currentTimeMillis(); + String taskName = taskInstanceName.getTaskName(); + LOG.info("Starting restore for state for task: {}", taskName); + + CompletableFuture<Void> restoreFuture; + if (forceRestoreTask.contains(taskName) && taskRestoreManager instanceof BlobStoreRestoreManager) { + restoreFuture = ((BlobStoreRestoreManager) taskRestoreManager).restore(true); + } else { + restoreFuture = taskRestoreManager.restore(); + } + + CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, ex) -> { + updateRestoreTime(startTime, samzaContainerMetrics, taskInstanceName); + + if (ex != null) { + if (isUnwrappedExceptionDeletedException(ex)) { + LOG.warn( + "Restore state for task: {} received DeletedException. Reattempting with getDeletedBlobs enabled", + taskInstanceName.getTaskName()); + + // Try to restore with getDeleted flag + CompletableFuture<Void> future = + restoreTaskStoresAndCreateCheckpointWithGetDeleted(taskInstanceName, taskCheckpoints, checkpointManager, + taskRestoreManager, config, taskInstanceMetrics, executor, + taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), loggedStoreDir, jobContext, + containerModel); + try { + if (future != null) { + future.join(); // Block and restore deleted state before continuing + } + } catch (Exception e) { + String msg = String.format("Unable to recover from DeletedException for task %s.", taskName); + throw new SamzaException(msg, e); + } finally { + updateRestoreTime(startTime, samzaContainerMetrics, taskInstanceName); + } + } else { + // log and rethrow exception to communicate restore failure + String msg = String.format("Error restoring state for task: %s", taskName); + LOG.error(msg, ex); + throw new SamzaException(msg, ex); // wrap in unchecked exception to throw from lambda + } + } + + // Stop all persistent stores after restoring. Certain persistent stores opened in BulkLoad mode are compacted + // on stop, so paralleling stop() also parallelizes their compaction (a time-intensive operation). + try { + taskRestoreManager.close(); + } catch (Exception e) { + LOG.error("Error closing restore manager for task: {} after {} restore", taskName, + ex != null ? "unsuccessful" : "successful", e); + // ignore exception from close. container may still be able to continue processing/backups + // if restore manager close fails. + } + return null; + }); + taskRestoreFutures.add(taskRestoreFuture); + }); + }); + + return taskRestoreFutures; + } + + public static CompletableFuture<Void> restoreTaskStoresAndCreateCheckpointWithGetDeleted(TaskName taskName, + Map<TaskName, Checkpoint> taskCheckpoints, CheckpointManager checkpointManager, + TaskRestoreManager taskRestoreManager, Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, + ExecutorService executor, Set<String> storesToRestore, File loggedStoreBaseDirectory, JobContext jobContext, + ContainerModel containerModel) { + + // if taskInstanceMetrics are specified use those for store metrics, + // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap + MetricsRegistry metricsRegistry = + taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() + : new MetricsRegistryMap(); + + BlobStoreManager blobStoreManager = getBlobStoreManager(config, executor); + JobConfig jobConfig = new JobConfig(config); + BlobStoreUtil blobStoreUtil = + new BlobStoreUtil(blobStoreManager, executor, new BlobStoreConfig(config), null, + new BlobStoreRestoreManagerMetrics(metricsRegistry)); + + BlobStoreRestoreManager blobStoreRestoreManager = (BlobStoreRestoreManager) taskRestoreManager; + + CheckpointId checkpointId = CheckpointId.create(); + Map<String, String> oldSCMs = ((CheckpointV2) taskCheckpoints.get(taskName)) + .getStateCheckpointMarkers().get(BlobStoreStateBackendFactory.class.getName()); + + // Returns a single future that guarantees all the following are completed, in this order: + // 1. Restore state locally by getting deleted blobs from the blob store. + // 2. Create a new Checkpoint from restored state and back it up on the blob store. + // 3. Clean up old/deleted Snapshot + // 4. Remove TTL from the new Snapshot on the blob store + // 5. Write the new checkpoint to checkpoint topic and update taskCheckpoints map + + // 1. restore state with getDeleted flag set to true + return blobStoreRestoreManager.restore(true).thenCompose(r -> { + // 2. Create and backup new checkpoint on the blob store. + CompletableFuture<Map<String, String>> uploadSCMs = + createNewCheckpointAndBackupStores(jobContext, containerModel, config, taskName, storesToRestore, checkpointId, + loggedStoreBaseDirectory, blobStoreManager, metricsRegistry, executor); + CompletableFuture<Void> future = uploadSCMs.thenCompose(scms -> { + // 3. Delete prev SnapshotIndex including files/subdirs, + Map<String, CompletionStage<Void>> deleteOldSnapshotsFutures = + deletePrevSnapshots(blobStoreUtil, oldSCMs, taskName, jobConfig); + // 4. Mark new Snapshots to never expire + Map<String, CompletionStage<Void>> removeNewSnapshotsTTLFutures = + removeTTLNewSnapshots(blobStoreUtil, scms, taskName, jobConfig); + // Combined future of delete old snapshots and removeTTL of new snapshots + CompletableFuture<Void> removeTTLFuture = CompletableFuture.allOf(FutureUtil.mapToFuture(deleteOldSnapshotsFutures), + FutureUtil.mapToFuture(removeNewSnapshotsTTLFutures)); + // 5. Update taskCheckpoints map with new checkpoint + CompletableFuture<Void> checkpointFuture = removeTTLFuture.thenRun(() -> { Review Comment: Updated to clarify each step and their dependency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org