shekhars-li commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1286282038


##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
     }
     return sideInputStores;
   }
+
+  public static List<Future<Void>> initAndRestoreTaskInstances(
+      Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, 
SamzaContainerMetrics samzaContainerMetrics,
+      CheckpointManager checkpointManager, JobContext jobContext, 
ContainerModel containerModel,
+      Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String, 
Set<String>>> taskBackendFactoryToStoreNames,
+      Config config, ExecutorService executor, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+    Set<String> forceRestoreTasks = new HashSet<>();
+    // Initialize each TaskStorageManager.
+    taskRestoreManagers.forEach((taskName, restoreManagers) ->
+        restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+          try {
+            taskRestoreManager.init(taskCheckpoints.get(taskName));
+          } catch (SamzaException ex) {
+            if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager 
instanceof BlobStoreRestoreManager) {
+              // Get deleted SnapshotIndex blob with GetDeleted and mark the 
task to be restored with GetDeleted as well.
+              // this ensures that the restore downloads the snapshot, 
recreates a new snapshot, uploads it to blob store
+              // and creates a new checkpoint.
+              ((BlobStoreRestoreManager) 
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+              forceRestoreTasks.add(taskName.getTaskName());
+            } else {
+              // log and rethrow exception to communicate restore failure
+              String msg = String.format("init failed for task: %s with 
GetDeleted set to true", taskName);
+              LOG.error(msg, ex);
+              throw new SamzaException(msg, ex);
+            }
+          }
+        })
+    );
+
+    // Start each store consumer once.
+    // Note: These consumers are per system and only changelog system store 
consumers will be started.
+    // Some TaskRestoreManagers may not require the consumer to be started, 
but due to the agnostic nature of
+    // ContainerStorageManager we always start the changelog consumer here in 
case it is required
+    storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
+
+    return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints, 
taskBackendFactoryToStoreNames, jobContext,
+        containerModel, samzaContainerMetrics, checkpointManager, config, 
taskInstanceMetrics, executor, loggerStoreDir,
+        forceRestoreTasks);
+  }
+
+  /**
+   * Restores all TaskInstances and returns a future for each TaskInstance 
restore. If this restore fails with DeletedException
+   * it will retry the restore with getDeleted flag, get all the blobs, 
recreate a new checkpoint in blob store and
+   * write that checkpoint to checkpoint topic.
+   * @param taskRestoreManagers map of TaskName to factory name to 
TaskRestoreManager map.
+   */
+  public static List<Future<Void>> restoreAllTaskInstances(
+      Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, 
Map<TaskName, Checkpoint> taskCheckpoints,
+      Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames, 
JobContext jobContext,
+      ContainerModel containerModel, SamzaContainerMetrics 
samzaContainerMetrics, CheckpointManager checkpointManager,
+      Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, 
ExecutorService executor,
+      File loggedStoreDir, Set<String> forceRestoreTask) {
+
+    List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+
+    // Submit restore callable for each taskInstance
+    taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
+      // Submit for each restore factory
+      restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
+        long startTime = System.currentTimeMillis();
+        String taskName = taskInstanceName.getTaskName();
+        LOG.info("Starting restore for state for task: {}", taskName);
+
+        CompletableFuture<Void> restoreFuture;
+        if (forceRestoreTask.contains(taskName) && taskRestoreManager 
instanceof BlobStoreRestoreManager) {
+          restoreFuture = ((BlobStoreRestoreManager) 
taskRestoreManager).restore(true);
+        } else {
+          restoreFuture = taskRestoreManager.restore();
+        }
+
+        CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, 
ex) -> {
+          updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
+
+          if (ex != null) {
+            if (isUnwrappedExceptionDeletedException(ex)) {
+              LOG.warn(
+                  "Restore state for task: {} received DeletedException. 
Reattempting with getDeletedBlobs enabled",
+                  taskInstanceName.getTaskName());
+
+              // Try to restore with getDeleted flag
+              CompletableFuture<Void> future =
+                  
restoreTaskStoresAndCreateCheckpointWithGetDeleted(taskInstanceName, 
taskCheckpoints, checkpointManager,
+                      taskRestoreManager, config, taskInstanceMetrics, 
executor,
+                      
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), 
loggedStoreDir, jobContext,
+                      containerModel);
+              try {
+                if (future != null) {
+                  future.join(); // Block and restore deleted state before 
continuing
+                }
+              } catch (Exception e) {
+                String msg = String.format("Unable to recover from 
DeletedException for task %s.", taskName);
+                throw new SamzaException(msg, e);
+              } finally {
+                updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
+              }
+            } else {
+              // log and rethrow exception to communicate restore failure
+              String msg = String.format("Error restoring state for task: %s", 
taskName);
+              LOG.error(msg, ex);
+              throw new SamzaException(msg, ex); // wrap in unchecked 
exception to throw from lambda
+            }
+          }
+
+          // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
+          // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
+          try {
+            taskRestoreManager.close();
+          } catch (Exception e) {
+            LOG.error("Error closing restore manager for task: {} after {} 
restore", taskName,
+                ex != null ? "unsuccessful" : "successful", e);
+            // ignore exception from close. container may still be able to 
continue processing/backups
+            // if restore manager close fails.
+          }
+          return null;
+        });
+        taskRestoreFutures.add(taskRestoreFuture);
+      });
+    });
+
+    return taskRestoreFutures;
+  }
+
+  public static CompletableFuture<Void> 
restoreTaskStoresAndCreateCheckpointWithGetDeleted(TaskName taskName,
+      Map<TaskName, Checkpoint> taskCheckpoints, CheckpointManager 
checkpointManager,
+      TaskRestoreManager taskRestoreManager, Config config, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      ExecutorService executor, Set<String> storesToRestore, File 
loggedStoreBaseDirectory, JobContext jobContext,
+      ContainerModel containerModel) {
+
+    // if taskInstanceMetrics are specified use those for store metrics,
+    // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
+    MetricsRegistry metricsRegistry =
+        taskInstanceMetrics.get(taskName) != null ? 
taskInstanceMetrics.get(taskName).registry()
+            : new MetricsRegistryMap();
+
+    BlobStoreManager blobStoreManager = getBlobStoreManager(config, executor);
+    JobConfig jobConfig = new JobConfig(config);
+    BlobStoreUtil blobStoreUtil =
+        new BlobStoreUtil(blobStoreManager, executor, new 
BlobStoreConfig(config), null,
+            new BlobStoreRestoreManagerMetrics(metricsRegistry));
+
+    BlobStoreRestoreManager blobStoreRestoreManager = 
(BlobStoreRestoreManager) taskRestoreManager;
+
+    CheckpointId checkpointId = CheckpointId.create();
+    Map<String, String> oldSCMs = ((CheckpointV2) 
taskCheckpoints.get(taskName))
+        
.getStateCheckpointMarkers().get(BlobStoreStateBackendFactory.class.getName());
+
+    // Returns a single future that guarantees all the following are 
completed, in this order:
+    // 1. Restore state locally by getting deleted blobs from the blob store.
+    // 2. Create a new Checkpoint from restored state and back it up on the 
blob store.
+    // 3. Clean up old/deleted Snapshot
+    // 4. Remove TTL from the new Snapshot on the blob store
+    // 5. Write the new checkpoint to checkpoint topic and update 
taskCheckpoints map
+
+    // 1. restore state with getDeleted flag set to true
+    return blobStoreRestoreManager.restore(true).thenCompose(r -> {
+      // 2. Create and backup new checkpoint on the blob store.
+      CompletableFuture<Map<String, String>> uploadSCMs =
+          createNewCheckpointAndBackupStores(jobContext, containerModel, 
config, taskName, storesToRestore, checkpointId,
+              loggedStoreBaseDirectory, blobStoreManager, metricsRegistry, 
executor);
+      CompletableFuture<Void> future = uploadSCMs.thenCompose(scms -> {
+        // 3. Delete prev SnapshotIndex including files/subdirs,
+        Map<String, CompletionStage<Void>> deleteOldSnapshotsFutures =
+            deletePrevSnapshots(blobStoreUtil, oldSCMs, taskName, jobConfig);
+        // 4. Mark new Snapshots to never expire
+        Map<String, CompletionStage<Void>> removeNewSnapshotsTTLFutures =
+            removeTTLNewSnapshots(blobStoreUtil, scms, taskName, jobConfig);
+        // Combined future of delete old snapshots and removeTTL of new 
snapshots
+        CompletableFuture<Void> removeTTLFuture = 
CompletableFuture.allOf(FutureUtil.mapToFuture(deleteOldSnapshotsFutures),
+            FutureUtil.mapToFuture(removeNewSnapshotsTTLFutures));
+        // 5. Update taskCheckpoints map with new checkpoint
+        CompletableFuture<Void> checkpointFuture = removeTTLFuture.thenRun(() 
-> {

Review Comment:
   Updated to clarify each step and their dependency. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to