shekhars-li commented on a change in pull request #1501:
URL: https://github.com/apache/samza/pull/1501#discussion_r635672000



##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import 
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+  private final JobModel jobModel;
+  private final ExecutorService executor;
+  private final String jobName;
+  private final String jobId;
+  private final ContainerModel containerModel;
+  private final TaskModel taskModel;
+  private final String taskName;
+  private final Config config;
+  private final Clock clock;
+  private final StorageManagerUtil storageManagerUtil;
+  private final List<String> storesToBackup;
+  private final File loggedStoreBaseDir;
+  private final BlobStoreUtil blobStoreUtil;
+
+  private final BlobStoreBackupManagerMetrics metrics;
+
+  /**
+   * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the 
corresponding {@link SnapshotIndex} from
+   * last successful task checkpoint or {@link #upload}.
+   *
+   * After {@link #init}, the map reflects the contents of the last completed 
checkpoint for the task from the previous
+   * deployment, if any.
+   *
+   * During regular processing, this map is updated after each successful 
{@link #upload} with the blob id of
+   * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the 
upload.
+   *
+   * The contents of this map are used to:
+   * 1. Delete any unused stores from the previous deployment in the remote 
store during {@link #init}.
+   * 2. Calculate the diff for local state between the last and the current 
checkpoint during {@link #upload}.
+   *
+   * Since the task commit process guarantees that the async stage of the 
previous commit is complete before another
+   * commit can start, this future is guaranteed to be complete in the call to 
{@link #upload} during the next commit.
+   *
+   * This field is non-final, since the future itself is replaced in its 
entirety after init/upload.
+   * The internal map contents are never directly modified (e.g. using puts). 
It's volatile to ensure visibility
+   * across threads since the map assignment may happen on a different thread 
than the one reading the contents.
+   */
+  private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+      prevStoreSnapshotIndexesFuture;
+
+  public BlobStoreBackupManager(JobModel jobModel, ContainerModel 
containerModel, TaskModel taskModel,
+      ExecutorService backupExecutor, BlobStoreBackupManagerMetrics 
blobStoreTaskBackupMetrics, Config config,
+      Clock clock, File loggedStoreBaseDir, StorageManagerUtil 
storageManagerUtil, BlobStoreUtil blobStoreUtil) {
+    this.jobModel = jobModel;
+    this.jobName = new JobConfig(config).getName().get();
+    this.jobId = new JobConfig(config).getJobId();
+    this.containerModel = containerModel;
+    this.taskModel = taskModel;
+    this.taskName = taskModel.getTaskName().getTaskName();
+    this.executor = backupExecutor;
+    this.config = config;
+    this.clock = clock;
+    this.storageManagerUtil = storageManagerUtil;
+    StorageConfig storageConfig = new StorageConfig(config);
+    this.storesToBackup =
+        
storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+    this.loggedStoreBaseDir = loggedStoreBaseDir;
+    this.blobStoreUtil = blobStoreUtil;
+    this.prevStoreSnapshotIndexesFuture = 
CompletableFuture.completedFuture(ImmutableMap.of());
+    this.metrics = blobStoreTaskBackupMetrics;
+    metrics.initStoreMetrics(storesToBackup);
+  }
+
+  @Override
+  public void init(Checkpoint checkpoint) {
+    long startTime = System.nanoTime();
+    LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+
+    // Note: blocks the caller (main) thread.
+    Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+        BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, 
taskName, checkpoint, blobStoreUtil);
+    this.prevStoreSnapshotIndexesFuture =
+        
CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
+    metrics.initNs.set(System.nanoTime() - startTime);
+  }
+
+  @Override
+  public Map<String, String> snapshot(CheckpointId checkpointId) {
+    // No-op. Stores are flushed and checkpoints are created by commit manager
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public CompletableFuture<Map<String, String>> upload(CheckpointId 
checkpointId, Map<String, String> storeSCMs) {
+    long uploadStartTime = System.nanoTime();
+
+    // reset gauges for each upload
+    metrics.filesToUpload.getValue().set(0L);
+    metrics.bytesToUpload.getValue().set(0L);
+    metrics.filesUploaded.getValue().set(0L);
+    metrics.bytesUploaded.getValue().set(0L);
+    metrics.filesRemaining.getValue().set(0L);
+    metrics.bytesRemaining.getValue().set(0L);
+    metrics.filesToRetain.getValue().set(0L);
+    metrics.bytesToRetain.getValue().set(0L);
+
+    Map<String, CompletableFuture<Pair<String, SnapshotIndex>>>

Review comment:
       Addressed in second pass. 

##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -66,6 +66,9 @@
   public static final String CHANGELOG_MIN_COMPACTION_LAG_MS = STORE_PREFIX + 
"%s.changelog." + MIN_COMPACTION_LAG_MS;
   public static final long DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS = 
TimeUnit.HOURS.toMillis(4);
 
+  public static final String BLOB_STORE_BACKEND_ADMIN_FACTORY = 
"blob.store.backend.admin.factory";

Review comment:
       Addressed in second pass.

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskRestoreManager;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import 
org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.util.FileUtil;
+import org.apache.samza.util.FutureUtil;
+import org.checkerframework.checker.nullness.Opt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreRestoreManager implements TaskRestoreManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreRestoreManager.class);
+  // when checking if checkpoint dir is the same as remote snapshot, exclude 
the "OFFSET" family of files files
+  // that are written to the checkpoint dir after the remote upload is 
complete as part of
+  // TaskStorageCommitManager#writeCheckpointToStoreDirectories.
+  private static final Set<String> FILES_TO_IGNORE = ImmutableSet.of(
+      StorageManagerUtil.OFFSET_FILE_NAME_LEGACY,
+      StorageManagerUtil.OFFSET_FILE_NAME_NEW,
+      StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY,
+      StorageManagerUtil.CHECKPOINT_FILE_NAME);
+
+  private final TaskModel taskModel;
+  private final String jobName;
+  private final String jobId;
+  private final ExecutorService executor;
+  private final Config config;
+  private final StorageConfig storageConfig;
+  private final StorageManagerUtil storageManagerUtil;
+  private final BlobStoreUtil blobStoreUtil;
+  private final File loggedBaseDir;
+  private final File nonLoggedBaseDir;
+  private final String taskName;
+  private final List<String> storesToRestore;
+
+  private final BlobStoreRestoreManagerMetrics metrics;
+
+  /**
+   * Map of store name and Pair of blob id of SnapshotIndex and the 
corresponding SnapshotIndex from last snapshot
+   * creation
+   */
+  private Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes;
+
+  public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService 
restoreExecutor,
+      BlobStoreRestoreManagerMetrics metrics, Config config, 
StorageManagerUtil storageManagerUtil,
+      BlobStoreUtil blobStoreUtil, File loggedBaseDir, File nonLoggedBaseDir) {
+    this.taskModel = taskModel;
+    this.jobName = new JobConfig(config).getName().get();
+    this.jobId = new JobConfig(config).getJobId();
+    this.executor = restoreExecutor; // TODO BLOCKER dchen1 dont block on 
restore executor
+    this.config = config;
+    this.storageConfig = new StorageConfig(config);
+    this.storageManagerUtil = storageManagerUtil;
+    this.blobStoreUtil = blobStoreUtil;
+    this.prevStoreSnapshotIndexes = new HashMap<>();
+    this.loggedBaseDir = loggedBaseDir;
+    this.nonLoggedBaseDir = nonLoggedBaseDir;
+    this.taskName = taskModel.getTaskName().getTaskName();
+    this.storesToRestore =
+        
storageConfig.getStoresWithStateBackendRestoreFactory(BlobStoreStateBackendFactory.class.getName());
+    this.metrics = metrics;
+  }
+
+  @Override
+  public void init(Checkpoint checkpoint) {
+    long startTime = System.nanoTime();
+    LOG.debug("Initializing blob store restore manager for task: {}", 
taskName);
+    // get previous SCMs from checkpoint
+    prevStoreSnapshotIndexes =
+        BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, 
taskName, checkpoint, blobStoreUtil);
+    metrics.getSnapshotIndexNs.set(System.nanoTime() - startTime);
+    LOG.trace("Found previous snapshot index during blob store restore manager 
init for task: {} to be: {}",
+        taskName, prevStoreSnapshotIndexes);
+
+    metrics.initStoreMetrics(storesToRestore);
+
+    // Note: blocks the caller (main) thread.
+    deleteUnusedStoresFromBlobStore(jobName, jobId, taskName, storageConfig, 
prevStoreSnapshotIndexes, blobStoreUtil, executor);
+    metrics.initNs.set(System.nanoTime() - startTime);
+  }
+
+  /**
+   * Restore state from checkpoints, state snapshots and changelog.
+   */
+  @Override
+  public void restore() {
+    restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, 
prevStoreSnapshotIndexes, loggedBaseDir,
+        storageConfig, metrics, storageManagerUtil, blobStoreUtil, executor);
+  }
+
+  @Override
+  public void close() {
+  }

Review comment:
       Addressed in second pass

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/blobstore/index/SnapshotIndex.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore.index;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+
+/**
+ * A {@link SnapshotIndex} contains all the information necessary for 
recreating the local store by
+ * downloading its contents from the remote blob store. The {@link 
SnapshotIndex} is itself serialized
+ * and stored as a blob in the remote store, and its blob id tracked in the 
Task checkpoint.
+ */
+public class SnapshotIndex {
+  private static final short SCHEMA_VERSION = 1;

Review comment:
       I should have added this to other schemas (DirIndex, FileIndex etc. as 
well) as well as needs to be added to serde logic. I will do that as part of a 
follow up update. 

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import 
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+  private final JobModel jobModel;
+  private final ExecutorService executor;
+  private final String jobName;
+  private final String jobId;
+  private final ContainerModel containerModel;
+  private final TaskModel taskModel;
+  private final String taskName;
+  private final Config config;
+  private final Clock clock;
+  private final StorageManagerUtil storageManagerUtil;
+  private final List<String> storesToBackup;
+  private final File loggedStoreBaseDir;
+  private final BlobStoreUtil blobStoreUtil;
+
+  private final BlobStoreBackupManagerMetrics metrics;
+
+  /**
+   * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the 
corresponding {@link SnapshotIndex} from
+   * last successful task checkpoint or {@link #upload}.
+   *
+   * After {@link #init}, the map reflects the contents of the last completed 
checkpoint for the task from the previous
+   * deployment, if any.
+   *
+   * During regular processing, this map is updated after each successful 
{@link #upload} with the blob id of
+   * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the 
upload.
+   *
+   * The contents of this map are used to:
+   * 1. Delete any unused stores from the previous deployment in the remote 
store during {@link #init}.
+   * 2. Calculate the diff for local state between the last and the current 
checkpoint during {@link #upload}.
+   *
+   * Since the task commit process guarantees that the async stage of the 
previous commit is complete before another
+   * commit can start, this future is guaranteed to be complete in the call to 
{@link #upload} during the next commit.
+   *
+   * This field is non-final, since the future itself is replaced in its 
entirety after init/upload.
+   * The internal map contents are never directly modified (e.g. using puts). 
It's volatile to ensure visibility
+   * across threads since the map assignment may happen on a different thread 
than the one reading the contents.
+   */
+  private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+      prevStoreSnapshotIndexesFuture;
+
+  public BlobStoreBackupManager(JobModel jobModel, ContainerModel 
containerModel, TaskModel taskModel,
+      ExecutorService backupExecutor, BlobStoreBackupManagerMetrics 
blobStoreTaskBackupMetrics, Config config,
+      Clock clock, File loggedStoreBaseDir, StorageManagerUtil 
storageManagerUtil, BlobStoreUtil blobStoreUtil) {
+    this.jobModel = jobModel;
+    this.jobName = new JobConfig(config).getName().get();
+    this.jobId = new JobConfig(config).getJobId();
+    this.containerModel = containerModel;
+    this.taskModel = taskModel;
+    this.taskName = taskModel.getTaskName().getTaskName();
+    this.executor = backupExecutor;
+    this.config = config;
+    this.clock = clock;
+    this.storageManagerUtil = storageManagerUtil;
+    StorageConfig storageConfig = new StorageConfig(config);
+    this.storesToBackup =
+        
storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+    this.loggedStoreBaseDir = loggedStoreBaseDir;
+    this.blobStoreUtil = blobStoreUtil;
+    this.prevStoreSnapshotIndexesFuture = 
CompletableFuture.completedFuture(ImmutableMap.of());
+    this.metrics = blobStoreTaskBackupMetrics;
+    metrics.initStoreMetrics(storesToBackup);
+  }
+
+  @Override
+  public void init(Checkpoint checkpoint) {
+    long startTime = System.nanoTime();
+    LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+
+    // Note: blocks the caller (main) thread.
+    Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+        BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, 
taskName, checkpoint, blobStoreUtil);
+    this.prevStoreSnapshotIndexesFuture =
+        
CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
+    metrics.initNs.set(System.nanoTime() - startTime);
+  }
+
+  @Override
+  public Map<String, String> snapshot(CheckpointId checkpointId) {
+    // No-op. Stores are flushed and checkpoints are created by commit manager
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public CompletableFuture<Map<String, String>> upload(CheckpointId 
checkpointId, Map<String, String> storeSCMs) {
+    long uploadStartTime = System.nanoTime();
+
+    // reset gauges for each upload
+    metrics.filesToUpload.getValue().set(0L);
+    metrics.bytesToUpload.getValue().set(0L);
+    metrics.filesUploaded.getValue().set(0L);
+    metrics.bytesUploaded.getValue().set(0L);
+    metrics.filesRemaining.getValue().set(0L);
+    metrics.bytesRemaining.getValue().set(0L);
+    metrics.filesToRetain.getValue().set(0L);
+    metrics.bytesToRetain.getValue().set(0L);
+
+    Map<String, CompletableFuture<Pair<String, SnapshotIndex>>>
+        storeToSCMAndSnapshotIndexPairFutures = new HashMap<>();
+    Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new 
HashMap<>();
+
+    storesToBackup.forEach((storeName) -> {
+      long storeUploadStartTime = System.nanoTime();
+      try {
+        // metadata for the current store snapshot to upload
+        SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId, 
jobName, jobId, taskName, storeName);
+
+        // Only durable/persistent stores are passed here from commit manager
+        // get the local store dir corresponding to the current checkpointId
+        File storeDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir, 
storeName,
+            taskModel.getTaskName(), taskModel.getTaskMode());
+        String checkpointDirPath = 
storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId);
+        File checkpointDir = new File(checkpointDirPath);
+
+        LOG.debug("Got task: {} store: {} storeDir: {} and checkpointDir: {}",
+            taskName, storeName, storeDir, checkpointDir);
+
+        // get the previous store directory contents
+        DirIndex prevDirIndex;
+
+        // guaranteed to be available since a new task commit may not start 
until the previous one is complete
+        Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+            prevStoreSnapshotIndexesFuture.get(0, TimeUnit.MILLISECONDS);
+
+        if (prevStoreSnapshotIndexes.containsKey(storeName)) {
+          prevDirIndex = 
prevStoreSnapshotIndexes.get(storeName).getRight().getDirIndex();
+        } else {
+          // no previous SnapshotIndex means that this is the first commit for 
this store. Create an empty DirIndex.
+          prevDirIndex = new DirIndex(checkpointDir.getName(), 
Collections.emptyList(), Collections.emptyList(),
+              Collections.emptyList(), Collections.emptyList());
+        }
+
+        long dirDiffStartTime = System.nanoTime();
+        // get the diff between previous and current store directories
+        DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, 
BlobStoreUtil.areSameFile(false));
+        metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() - 
dirDiffStartTime);
+
+        DirDiff.Stats stats = DirDiff.getStats(dirDiff);
+        updateStoreDiffMetrics(storeName, stats);
+        metrics.filesToUpload.getValue().addAndGet(stats.filesAdded);
+        metrics.bytesToUpload.getValue().addAndGet(stats.bytesAdded);
+        metrics.filesRemaining.getValue().addAndGet(stats.filesAdded);
+        metrics.bytesRemaining.getValue().addAndGet(stats.bytesAdded);
+        metrics.filesToRetain.getValue().addAndGet(stats.filesRetained);
+        metrics.bytesToRetain.getValue().addAndGet(stats.bytesRetained);
+
+        // upload the diff to the blob store and get the new directory index
+        CompletionStage<DirIndex> dirIndexFuture = 
blobStoreUtil.putDir(dirDiff, snapshotMetadata);
+
+        CompletionStage<SnapshotIndex> snapshotIndexFuture =
+            dirIndexFuture.thenApplyAsync(dirIndex -> {
+              LOG.trace("Dir upload complete. Returning new SnapshotIndex for 
task: {} store: {}.", taskName, storeName);
+              Optional<String> prevSnapshotIndexBlobId =
+                  Optional.ofNullable(prevStoreSnapshotIndexes.get(storeName))
+                  .map(Pair::getLeft);
+              return new SnapshotIndex(clock.currentTimeMillis(), 
snapshotMetadata, dirIndex, prevSnapshotIndexBlobId);
+            }, executor);
+
+        // upload the new snapshot index to the blob store and get its blob id
+        CompletionStage<String> snapshotIndexBlobIdFuture =
+            snapshotIndexFuture
+                .thenComposeAsync(si -> {
+                  LOG.trace("Uploading Snapshot index for task: {} store: {}", 
taskName, storeName);
+                  return blobStoreUtil.putSnapshotIndex(si);
+                }, executor);
+
+        // update the temporary storeName to previous snapshot index map with 
the new mapping.

Review comment:
       Updated in second pass

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskRestoreManager;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import 
org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.util.FileUtil;
+import org.apache.samza.util.FutureUtil;
+import org.checkerframework.checker.nullness.Opt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreRestoreManager implements TaskRestoreManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreRestoreManager.class);
+  // when checking if checkpoint dir is the same as remote snapshot, exclude 
the "OFFSET" family of files files
+  // that are written to the checkpoint dir after the remote upload is 
complete as part of
+  // TaskStorageCommitManager#writeCheckpointToStoreDirectories.
+  private static final Set<String> FILES_TO_IGNORE = ImmutableSet.of(
+      StorageManagerUtil.OFFSET_FILE_NAME_LEGACY,
+      StorageManagerUtil.OFFSET_FILE_NAME_NEW,
+      StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY,
+      StorageManagerUtil.CHECKPOINT_FILE_NAME);
+
+  private final TaskModel taskModel;
+  private final String jobName;
+  private final String jobId;
+  private final ExecutorService executor;
+  private final Config config;
+  private final StorageConfig storageConfig;
+  private final StorageManagerUtil storageManagerUtil;
+  private final BlobStoreUtil blobStoreUtil;
+  private final File loggedBaseDir;
+  private final File nonLoggedBaseDir;
+  private final String taskName;
+  private final List<String> storesToRestore;
+
+  private final BlobStoreRestoreManagerMetrics metrics;
+
+  /**
+   * Map of store name and Pair of blob id of SnapshotIndex and the 
corresponding SnapshotIndex from last snapshot
+   * creation
+   */
+  private Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes;
+
+  public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService 
restoreExecutor,
+      BlobStoreRestoreManagerMetrics metrics, Config config, 
StorageManagerUtil storageManagerUtil,
+      BlobStoreUtil blobStoreUtil, File loggedBaseDir, File nonLoggedBaseDir) {
+    this.taskModel = taskModel;
+    this.jobName = new JobConfig(config).getName().get();
+    this.jobId = new JobConfig(config).getJobId();
+    this.executor = restoreExecutor; // TODO BLOCKER dchen1 dont block on 
restore executor
+    this.config = config;
+    this.storageConfig = new StorageConfig(config);
+    this.storageManagerUtil = storageManagerUtil;
+    this.blobStoreUtil = blobStoreUtil;
+    this.prevStoreSnapshotIndexes = new HashMap<>();
+    this.loggedBaseDir = loggedBaseDir;
+    this.nonLoggedBaseDir = nonLoggedBaseDir;
+    this.taskName = taskModel.getTaskName().getTaskName();
+    this.storesToRestore =
+        
storageConfig.getStoresWithStateBackendRestoreFactory(BlobStoreStateBackendFactory.class.getName());
+    this.metrics = metrics;
+  }
+
+  @Override
+  public void init(Checkpoint checkpoint) {
+    long startTime = System.nanoTime();
+    LOG.debug("Initializing blob store restore manager for task: {}", 
taskName);
+    // get previous SCMs from checkpoint
+    prevStoreSnapshotIndexes =
+        BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, 
taskName, checkpoint, blobStoreUtil);
+    metrics.getSnapshotIndexNs.set(System.nanoTime() - startTime);
+    LOG.trace("Found previous snapshot index during blob store restore manager 
init for task: {} to be: {}",
+        taskName, prevStoreSnapshotIndexes);
+
+    metrics.initStoreMetrics(storesToRestore);
+
+    // Note: blocks the caller (main) thread.
+    deleteUnusedStoresFromBlobStore(jobName, jobId, taskName, storageConfig, 
prevStoreSnapshotIndexes, blobStoreUtil, executor);
+    metrics.initNs.set(System.nanoTime() - startTime);
+  }
+
+  /**
+   * Restore state from checkpoints, state snapshots and changelog.
+   */
+  @Override
+  public void restore() {
+    restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, 
prevStoreSnapshotIndexes, loggedBaseDir,
+        storageConfig, metrics, storageManagerUtil, blobStoreUtil, executor);
+  }
+
+  @Override
+  public void close() {
+  }
+
+  /**
+   * Deletes blob store contents for stores that were present in the last 
checkpoint but are either no longer
+   * present in job configs (removed by user since last deploymetn) or are no 
longer configured to be backed
+   * up using blob stores.
+   *
+   * This method blocks until all the necessary store contents and snapshot 
index blobs have been marked for deletion.
+   */
+  @VisibleForTesting
+  static void deleteUnusedStoresFromBlobStore(String jobName, String jobId, 
String taskName, StorageConfig storageConfig,
+      Map<String, Pair<String, SnapshotIndex>> initialStoreSnapshotIndexes,
+      BlobStoreUtil blobStoreUtil, ExecutorService executor) {
+
+    List<String> storesToBackup =
+        
storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+    List<String> storesToRestore =
+        
storageConfig.getStoresWithStateBackendRestoreFactory(BlobStoreStateBackendFactory.class.getName());
+
+    List<CompletionStage<Void>> storeDeletionFutures = new ArrayList<>();
+    initialStoreSnapshotIndexes.forEach((storeName, scmAndSnapshotIndex) -> {
+      if (!storesToBackup.contains(storeName) && 
!storesToRestore.contains(storeName)) {
+        LOG.debug("Removing task: {} store: {} from blob store. It is either 
no longer used, " +
+            "or is no longer configured to be backed up or restored with blob 
store.", taskName, storeName);
+        DirIndex dirIndex = scmAndSnapshotIndex.getRight().getDirIndex();
+        Metadata requestMetadata =
+            new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX, 
Optional.empty(), jobName, jobId, taskName, storeName);
+        CompletionStage<Void> storeDeletionFuture =
+            blobStoreUtil.cleanUpDir(dirIndex, requestMetadata) // delete 
files and sub-dirs previously marked for removal
+                .thenComposeAsync(v ->
+                    blobStoreUtil.deleteDir(dirIndex, requestMetadata), 
executor) // deleted files and dirs still present
+                .thenComposeAsync(v -> blobStoreUtil.deleteSnapshotIndexBlob(
+                    scmAndSnapshotIndex.getLeft(), requestMetadata),
+                    executor); // delete the snapshot index blob
+        storeDeletionFutures.add(storeDeletionFuture);
+      }
+    });
+
+    FutureUtil.allOf(storeDeletionFutures).join();
+  }
+
+  /**
+   * Restores all eligible stores in the task.
+   */
+  @VisibleForTesting
+  static void restoreStores(String jobName, String jobId, TaskName taskName, 
List<String> storesToRestore,
+      Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
+      File loggedBaseDir, StorageConfig storageConfig, 
BlobStoreRestoreManagerMetrics metrics,
+      StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil,
+      ExecutorService executor) {
+    long restoreStartTime = System.nanoTime();
+    List<CompletionStage<Void>> restoreFutures = new ArrayList<>();
+
+    LOG.debug("Starting restore for task: {} stores: {}", taskName, 
storesToRestore);
+    storesToRestore.forEach(storeName -> {
+      if (!prevStoreSnapshotIndexes.containsKey(storeName)) {
+        LOG.debug("No checkpointed snapshot index found for task: {} store: 
{}. Skipping restore.", taskName, storeName);
+        // TODO HIGH shesharm what should we do with the local state already 
present on disk, if any?
+        // E.g. this will be the case if user changes a store from changelog 
based backup and restore to
+        // blob store based backup and restore, both at the same time.
+        return;
+      }
+
+      Pair<String, SnapshotIndex> scmAndSnapshotIndex = 
prevStoreSnapshotIndexes.get(storeName);
+
+      long storeRestoreStartTime = System.nanoTime();
+      SnapshotIndex snapshotIndex = scmAndSnapshotIndex.getRight();
+      DirIndex dirIndex = snapshotIndex.getDirIndex();
+
+      // TODO MINOR shesharm: calculate recursively similar to DirDiff.Stats
+      long bytesToRestore = dirIndex.getFilesPresent().stream().mapToLong(fi 
-> fi.getFileMetadata().getSize()).sum();
+      
metrics.filesToRestore.getValue().addAndGet(dirIndex.getFilesPresent().size());
+      metrics.bytesToRestore.getValue().addAndGet(bytesToRestore);
+      
metrics.filesRemaining.getValue().addAndGet(dirIndex.getFilesPresent().size());
+      metrics.bytesRemaining.getValue().addAndGet(bytesToRestore);
+
+      CheckpointId checkpointId = 
snapshotIndex.getSnapshotMetadata().getCheckpointId();
+      File storeDir = storageManagerUtil.getTaskStoreDir(loggedBaseDir, 
storeName, taskName, TaskMode.Active);
+      Path storeCheckpointDir = 
Paths.get(storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId));
+      LOG.trace("Got task: {} store: {} local store directory: {} and local 
store checkpoint directory: {}",
+          taskName, storeName, storeDir, storeCheckpointDir);
+
+      // we always delete the store dir to preserve transactional state 
guarantees.
+      try {
+        LOG.debug("Deleting local store directory: {}. Will be restored from 
local store checkpoint directory " +
+            "or remote snapshot.", storeDir);
+        FileUtils.deleteDirectory(storeDir);
+      } catch (IOException e) {
+        throw new SamzaException(String.format("Error deleting store 
directory: %s", storeDir), e);
+      }
+
+      boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName, 
dirIndex,
+          storeCheckpointDir, storageConfig, blobStoreUtil);
+
+      if (shouldRestore) { // restore the store from the remote blob store
+        LOG.debug("Deleting local store checkpoint directory: {} before 
restore.", storeCheckpointDir);
+        // delete all store checkpoint directories. if we only delete the 
store directory and don't
+        // delete the checkpoint directories, the store size on disk will grow 
to 2x after restore
+        // until the first commit is completed and older checkpoint dirs are 
deleted. This is
+        // because the hard-linked checkpoint dir files will no longer be 
de-duped with the
+        // now-deleted main store directory contents and will take up 
additional space of their
+        // own during the restore.
+        deleteCheckpointDirs(taskName, storeName, loggedBaseDir, 
storageManagerUtil);
+
+        enqueueRestore(jobName, jobId, taskName.toString(), storeName, 
storeDir, dirIndex, storeRestoreStartTime,
+            restoreFutures, blobStoreUtil, metrics, executor);
+      } else {
+        LOG.debug("Renaming store checkpoint directory: {} to store directory: 
{} since its contents are identical " +
+            "to the remote snapshot.", storeCheckpointDir, storeDir);
+        // atomically rename the checkpoint dir to the store dir
+        new FileUtil().move(storeCheckpointDir.toFile(), storeDir);
+
+        // delete any other checkpoint dirs.
+        deleteCheckpointDirs(taskName, storeName, loggedBaseDir, 
storageManagerUtil);
+      }
+    });
+
+    // wait for all restores to finish
+    FutureUtil.allOf(restoreFutures).whenComplete((res, ex) -> {
+      LOG.info("Restore completed for task: {} stores", taskName);
+      metrics.restoreNs.set(System.nanoTime() - restoreStartTime);
+    }).join(); // TODO BLOCKER dchen1 make non-blocking.
+  }
+
+  /**
+   * Determines if the store needs to be restored from remote snapshot based 
on local and remote state.
+   */
+  @VisibleForTesting
+  static boolean shouldRestore(String taskName, String storeName, DirIndex 
dirIndex,
+      Path storeCheckpointDir, StorageConfig storageConfig, BlobStoreUtil 
blobStoreUtil) {
+    // if a store checkpoint directory exists for the last successful task 
checkpoint, try to use it.
+    boolean restoreStore;
+    if (Files.exists(storeCheckpointDir)) {
+      if (storageConfig.getCleanLoggedStoreDirsOnStart(storeName)) {

Review comment:
       The method name seems right to me. That method just gets the config 
associated with 'clean logged store dirs on start' boolean. Did you mean 
something else?

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import 
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+  private final JobModel jobModel;
+  private final ExecutorService executor;
+  private final String jobName;
+  private final String jobId;
+  private final ContainerModel containerModel;
+  private final TaskModel taskModel;
+  private final String taskName;
+  private final Config config;
+  private final Clock clock;
+  private final StorageManagerUtil storageManagerUtil;
+  private final List<String> storesToBackup;
+  private final File loggedStoreBaseDir;
+  private final BlobStoreUtil blobStoreUtil;
+
+  private final BlobStoreBackupManagerMetrics metrics;
+
+  /**
+   * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the 
corresponding {@link SnapshotIndex} from
+   * last successful task checkpoint or {@link #upload}.
+   *
+   * After {@link #init}, the map reflects the contents of the last completed 
checkpoint for the task from the previous
+   * deployment, if any.
+   *
+   * During regular processing, this map is updated after each successful 
{@link #upload} with the blob id of
+   * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the 
upload.
+   *
+   * The contents of this map are used to:
+   * 1. Delete any unused stores from the previous deployment in the remote 
store during {@link #init}.
+   * 2. Calculate the diff for local state between the last and the current 
checkpoint during {@link #upload}.
+   *
+   * Since the task commit process guarantees that the async stage of the 
previous commit is complete before another
+   * commit can start, this future is guaranteed to be complete in the call to 
{@link #upload} during the next commit.
+   *
+   * This field is non-final, since the future itself is replaced in its 
entirety after init/upload.
+   * The internal map contents are never directly modified (e.g. using puts). 
It's volatile to ensure visibility
+   * across threads since the map assignment may happen on a different thread 
than the one reading the contents.
+   */
+  private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+      prevStoreSnapshotIndexesFuture;
+
+  public BlobStoreBackupManager(JobModel jobModel, ContainerModel 
containerModel, TaskModel taskModel,
+      ExecutorService backupExecutor, BlobStoreBackupManagerMetrics 
blobStoreTaskBackupMetrics, Config config,
+      Clock clock, File loggedStoreBaseDir, StorageManagerUtil 
storageManagerUtil, BlobStoreUtil blobStoreUtil) {
+    this.jobModel = jobModel;
+    this.jobName = new JobConfig(config).getName().get();
+    this.jobId = new JobConfig(config).getJobId();
+    this.containerModel = containerModel;
+    this.taskModel = taskModel;
+    this.taskName = taskModel.getTaskName().getTaskName();
+    this.executor = backupExecutor;
+    this.config = config;
+    this.clock = clock;
+    this.storageManagerUtil = storageManagerUtil;
+    StorageConfig storageConfig = new StorageConfig(config);
+    this.storesToBackup =
+        
storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+    this.loggedStoreBaseDir = loggedStoreBaseDir;
+    this.blobStoreUtil = blobStoreUtil;
+    this.prevStoreSnapshotIndexesFuture = 
CompletableFuture.completedFuture(ImmutableMap.of());
+    this.metrics = blobStoreTaskBackupMetrics;
+    metrics.initStoreMetrics(storesToBackup);
+  }
+
+  @Override
+  public void init(Checkpoint checkpoint) {
+    long startTime = System.nanoTime();
+    LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+
+    // Note: blocks the caller (main) thread.
+    Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+        BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, 
taskName, checkpoint, blobStoreUtil);
+    this.prevStoreSnapshotIndexesFuture =
+        
CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
+    metrics.initNs.set(System.nanoTime() - startTime);
+  }
+
+  @Override
+  public Map<String, String> snapshot(CheckpointId checkpointId) {
+    // No-op. Stores are flushed and checkpoints are created by commit manager
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public CompletableFuture<Map<String, String>> upload(CheckpointId 
checkpointId, Map<String, String> storeSCMs) {
+    long uploadStartTime = System.nanoTime();
+
+    // reset gauges for each upload
+    metrics.filesToUpload.getValue().set(0L);
+    metrics.bytesToUpload.getValue().set(0L);
+    metrics.filesUploaded.getValue().set(0L);
+    metrics.bytesUploaded.getValue().set(0L);
+    metrics.filesRemaining.getValue().set(0L);
+    metrics.bytesRemaining.getValue().set(0L);
+    metrics.filesToRetain.getValue().set(0L);
+    metrics.bytesToRetain.getValue().set(0L);
+
+    Map<String, CompletableFuture<Pair<String, SnapshotIndex>>>
+        storeToSCMAndSnapshotIndexPairFutures = new HashMap<>();
+    Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new 
HashMap<>();
+
+    storesToBackup.forEach((storeName) -> {
+      long storeUploadStartTime = System.nanoTime();
+      try {
+        // metadata for the current store snapshot to upload
+        SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId, 
jobName, jobId, taskName, storeName);
+
+        // Only durable/persistent stores are passed here from commit manager
+        // get the local store dir corresponding to the current checkpointId
+        File storeDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir, 
storeName,
+            taskModel.getTaskName(), taskModel.getTaskMode());
+        String checkpointDirPath = 
storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId);
+        File checkpointDir = new File(checkpointDirPath);
+
+        LOG.debug("Got task: {} store: {} storeDir: {} and checkpointDir: {}",
+            taskName, storeName, storeDir, checkpointDir);
+
+        // get the previous store directory contents
+        DirIndex prevDirIndex;
+
+        // guaranteed to be available since a new task commit may not start 
until the previous one is complete
+        Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+            prevStoreSnapshotIndexesFuture.get(0, TimeUnit.MILLISECONDS);
+
+        if (prevStoreSnapshotIndexes.containsKey(storeName)) {
+          prevDirIndex = 
prevStoreSnapshotIndexes.get(storeName).getRight().getDirIndex();
+        } else {
+          // no previous SnapshotIndex means that this is the first commit for 
this store. Create an empty DirIndex.
+          prevDirIndex = new DirIndex(checkpointDir.getName(), 
Collections.emptyList(), Collections.emptyList(),
+              Collections.emptyList(), Collections.emptyList());
+        }
+
+        long dirDiffStartTime = System.nanoTime();
+        // get the diff between previous and current store directories
+        DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, 
BlobStoreUtil.areSameFile(false));
+        metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() - 
dirDiffStartTime);
+
+        DirDiff.Stats stats = DirDiff.getStats(dirDiff);
+        updateStoreDiffMetrics(storeName, stats);
+        metrics.filesToUpload.getValue().addAndGet(stats.filesAdded);
+        metrics.bytesToUpload.getValue().addAndGet(stats.bytesAdded);
+        metrics.filesRemaining.getValue().addAndGet(stats.filesAdded);
+        metrics.bytesRemaining.getValue().addAndGet(stats.bytesAdded);
+        metrics.filesToRetain.getValue().addAndGet(stats.filesRetained);
+        metrics.bytesToRetain.getValue().addAndGet(stats.bytesRetained);
+
+        // upload the diff to the blob store and get the new directory index
+        CompletionStage<DirIndex> dirIndexFuture = 
blobStoreUtil.putDir(dirDiff, snapshotMetadata);
+
+        CompletionStage<SnapshotIndex> snapshotIndexFuture =
+            dirIndexFuture.thenApplyAsync(dirIndex -> {
+              LOG.trace("Dir upload complete. Returning new SnapshotIndex for 
task: {} store: {}.", taskName, storeName);
+              Optional<String> prevSnapshotIndexBlobId =
+                  Optional.ofNullable(prevStoreSnapshotIndexes.get(storeName))
+                  .map(Pair::getLeft);
+              return new SnapshotIndex(clock.currentTimeMillis(), 
snapshotMetadata, dirIndex, prevSnapshotIndexBlobId);
+            }, executor);
+
+        // upload the new snapshot index to the blob store and get its blob id
+        CompletionStage<String> snapshotIndexBlobIdFuture =
+            snapshotIndexFuture
+                .thenComposeAsync(si -> {
+                  LOG.trace("Uploading Snapshot index for task: {} store: {}", 
taskName, storeName);
+                  return blobStoreUtil.putSnapshotIndex(si);
+                }, executor);
+
+        // update the temporary storeName to previous snapshot index map with 
the new mapping.
+        CompletableFuture<Pair<String, SnapshotIndex>> 
scmAndSnapshotIndexPairFuture =
+            FutureUtil.toFutureOfPair(
+                Pair.of(snapshotIndexBlobIdFuture.toCompletableFuture(), 
snapshotIndexFuture.toCompletableFuture()));
+
+        scmAndSnapshotIndexPairFuture.whenComplete((res, ex) -> {
+          long uploadTimeNs = System.nanoTime() - storeUploadStartTime;
+          metrics.storeUploadNs.get(storeName).update(uploadTimeNs);
+        });
+
+        storeToSCMAndSnapshotIndexPairFutures.put(storeName, 
scmAndSnapshotIndexPairFuture);
+        storeToSerializedSCMFuture.put(storeName, 
snapshotIndexBlobIdFuture.toCompletableFuture());
+      } catch (Exception e) {
+        throw new SamzaException(
+            String.format("Error uploading store snapshot to blob store for 
task: %s, store: %s, checkpointId: %s",
+                taskName, storeName, checkpointId), e);
+      }
+    });
+
+    // replace the previous storeName to snapshot index mapping with the new 
mapping.
+    this.prevStoreSnapshotIndexesFuture =
+        FutureUtil.toFutureOfMap(storeToSCMAndSnapshotIndexPairFutures);
+
+    return FutureUtil.toFutureOfMap(storeToSerializedSCMFuture)
+        .whenComplete((res, ex) -> metrics.uploadNs.update(System.nanoTime() - 
uploadStartTime));
+  }
+
+  /**
+   * Clean up would be called at the end of every commit as well as on a 
container start/restart.
+   * Clean up involves the following steps:
+   * 1. Remove TTL of the snapshot index blob and for any associated files and 
sub-dirs marked for retention.
+   * 2. Delete the files/subdirs marked for deletion in the snapshot index.
+   * 3. Delete the remote {@link SnapshotIndex} blob for the previous 
checkpoint.
+   * @param checkpointId the {@link CheckpointId} of the last successfully 
committed checkpoint.
+   * @param storeSCMs store name to state checkpoint markers for the last 
successfully committed checkpoint
+   */
+  @Override
+  public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, 
Map<String, String> storeSCMs) {
+    long startTime = System.nanoTime();
+    List<CompletionStage<Void>> removeTTLFutures = new ArrayList<>();
+    List<CompletionStage<Void>> cleanupRemoteSnapshotFutures = new 
ArrayList<>();
+    List<CompletionStage<Void>> removePrevRemoteSnapshotFutures = new 
ArrayList<>();
+
+    // SCM, in case of blob store backup and restore, is just the blob id of 
SnapshotIndex representing the remote snapshot
+    storeSCMs.forEach((storeName, snapshotIndexBlobId) -> {
+      Metadata requestMetadata =
+          new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX, Optional.empty(), 
jobName, jobId, taskName, storeName);
+      CompletionStage<SnapshotIndex> snapshotIndexFuture =
+          blobStoreUtil.getSnapshotIndex(snapshotIndexBlobId, requestMetadata);
+
+      // 1. remove TTL of index blob and all of its files and sub-dirs marked 
for retention
+      CompletionStage<Void> removeTTLFuture =
+          snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
+            LOG.debug("Removing TTL for index blob: {} for task: {} store :{}",
+                snapshotIndexBlobId, taskName, storeName);
+            return blobStoreUtil.removeTTL(snapshotIndexBlobId, snapshotIndex, 
requestMetadata);
+          }, executor);
+      removeTTLFutures.add(removeTTLFuture);
+
+      // 2. delete the files/subdirs marked for deletion in the snapshot index.
+      CompletionStage<Void> cleanupRemoteSnapshotFuture =
+          snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
+            LOG.debug("Deleting files and dirs to remove for current index 
blob: {} for task: {} store: {}",
+                snapshotIndexBlobId, taskName, storeName);
+            return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), 
requestMetadata);
+          }, executor);
+
+      cleanupRemoteSnapshotFutures.add(cleanupRemoteSnapshotFuture);
+
+      // 3. delete the remote {@link SnapshotIndex} blob for the previous 
checkpoint.
+      CompletionStage<Void> removePrevRemoteSnapshotFuture =
+          snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
+            if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) {
+              String blobId = snapshotIndex.getPrevSnapshotIndexBlobId().get();
+              LOG.debug("Removing previous snapshot index blob: {} from blob 
store for task: {} store: {}.",
+                  blobId, taskName, storeName);
+              return blobStoreUtil.deleteSnapshotIndexBlob(blobId, 
requestMetadata);
+            } else {
+              // complete future immediately. There are no previous snapshots 
index blobs to delete.
+              return CompletableFuture.completedFuture(null);
+            }
+          }, executor);
+      removePrevRemoteSnapshotFutures.add(removePrevRemoteSnapshotFuture);
+    });
+
+    return FutureUtil.allOf(removeTTLFutures, cleanupRemoteSnapshotFutures, 
removePrevRemoteSnapshotFutures)
+        .whenComplete((res, ex) -> metrics.cleanupNs.update(System.nanoTime() 
- startTime));
+  }
+
+  @Override
+  public void close() {
+    // TODO need to init and close blob store manager instances?

Review comment:
       Addressed in second pass




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to