shekhars-li commented on a change in pull request #1501: URL: https://github.com/apache/samza/pull/1501#discussion_r635672000
########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore; + +import com.google.common.collect.ImmutableMap; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.SamzaException; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.checkpoint.CheckpointId; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.storage.StorageManagerUtil; +import org.apache.samza.storage.TaskBackupManager; +import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil; +import org.apache.samza.storage.blobstore.util.BlobStoreUtil; +import org.apache.samza.storage.blobstore.util.DirDiffUtil; +import org.apache.samza.util.Clock; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class BlobStoreBackupManager implements TaskBackupManager { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreBackupManager.class); + + private final JobModel jobModel; + private final ExecutorService executor; + private final String jobName; + private final String jobId; + private final ContainerModel containerModel; + private final TaskModel taskModel; + private final String taskName; + private final Config config; + private final Clock clock; + private final StorageManagerUtil storageManagerUtil; + private final List<String> storesToBackup; + private final File loggedStoreBaseDir; + private final BlobStoreUtil blobStoreUtil; + + private final BlobStoreBackupManagerMetrics metrics; + + /** + * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} from + * last successful task checkpoint or {@link #upload}. + * + * After {@link #init}, the map reflects the contents of the last completed checkpoint for the task from the previous + * deployment, if any. + * + * During regular processing, this map is updated after each successful {@link #upload} with the blob id of + * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the upload. + * + * The contents of this map are used to: + * 1. Delete any unused stores from the previous deployment in the remote store during {@link #init}. + * 2. Calculate the diff for local state between the last and the current checkpoint during {@link #upload}. + * + * Since the task commit process guarantees that the async stage of the previous commit is complete before another + * commit can start, this future is guaranteed to be complete in the call to {@link #upload} during the next commit. + * + * This field is non-final, since the future itself is replaced in its entirety after init/upload. + * The internal map contents are never directly modified (e.g. using puts). It's volatile to ensure visibility + * across threads since the map assignment may happen on a different thread than the one reading the contents. + */ + private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>> + prevStoreSnapshotIndexesFuture; + + public BlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel, + ExecutorService backupExecutor, BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config, + Clock clock, File loggedStoreBaseDir, StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil) { + this.jobModel = jobModel; + this.jobName = new JobConfig(config).getName().get(); + this.jobId = new JobConfig(config).getJobId(); + this.containerModel = containerModel; + this.taskModel = taskModel; + this.taskName = taskModel.getTaskName().getTaskName(); + this.executor = backupExecutor; + this.config = config; + this.clock = clock; + this.storageManagerUtil = storageManagerUtil; + StorageConfig storageConfig = new StorageConfig(config); + this.storesToBackup = + storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName()); + this.loggedStoreBaseDir = loggedStoreBaseDir; + this.blobStoreUtil = blobStoreUtil; + this.prevStoreSnapshotIndexesFuture = CompletableFuture.completedFuture(ImmutableMap.of()); + this.metrics = blobStoreTaskBackupMetrics; + metrics.initStoreMetrics(storesToBackup); + } + + @Override + public void init(Checkpoint checkpoint) { + long startTime = System.nanoTime(); + LOG.debug("Initializing blob store backup manager for task: {}", taskName); + + // Note: blocks the caller (main) thread. + Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes = + BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, blobStoreUtil); + this.prevStoreSnapshotIndexesFuture = + CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes)); + metrics.initNs.set(System.nanoTime() - startTime); + } + + @Override + public Map<String, String> snapshot(CheckpointId checkpointId) { + // No-op. Stores are flushed and checkpoints are created by commit manager + return Collections.emptyMap(); + } + + @Override + public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId, Map<String, String> storeSCMs) { + long uploadStartTime = System.nanoTime(); + + // reset gauges for each upload + metrics.filesToUpload.getValue().set(0L); + metrics.bytesToUpload.getValue().set(0L); + metrics.filesUploaded.getValue().set(0L); + metrics.bytesUploaded.getValue().set(0L); + metrics.filesRemaining.getValue().set(0L); + metrics.bytesRemaining.getValue().set(0L); + metrics.filesToRetain.getValue().set(0L); + metrics.bytesToRetain.getValue().set(0L); + + Map<String, CompletableFuture<Pair<String, SnapshotIndex>>> Review comment: Addressed in second pass. ########## File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java ########## @@ -66,6 +66,9 @@ public static final String CHANGELOG_MIN_COMPACTION_LAG_MS = STORE_PREFIX + "%s.changelog." + MIN_COMPACTION_LAG_MS; public static final long DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS = TimeUnit.HOURS.toMillis(4); + public static final String BLOB_STORE_BACKEND_ADMIN_FACTORY = "blob.store.backend.admin.factory"; Review comment: Addressed in second pass. ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java ########## @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.SamzaException; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.checkpoint.CheckpointId; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.storage.StorageManagerUtil; +import org.apache.samza.storage.TaskRestoreManager; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; +import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil; +import org.apache.samza.storage.blobstore.util.BlobStoreUtil; +import org.apache.samza.util.FileUtil; +import org.apache.samza.util.FutureUtil; +import org.checkerframework.checker.nullness.Opt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class BlobStoreRestoreManager implements TaskRestoreManager { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreRestoreManager.class); + // when checking if checkpoint dir is the same as remote snapshot, exclude the "OFFSET" family of files files + // that are written to the checkpoint dir after the remote upload is complete as part of + // TaskStorageCommitManager#writeCheckpointToStoreDirectories. + private static final Set<String> FILES_TO_IGNORE = ImmutableSet.of( + StorageManagerUtil.OFFSET_FILE_NAME_LEGACY, + StorageManagerUtil.OFFSET_FILE_NAME_NEW, + StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY, + StorageManagerUtil.CHECKPOINT_FILE_NAME); + + private final TaskModel taskModel; + private final String jobName; + private final String jobId; + private final ExecutorService executor; + private final Config config; + private final StorageConfig storageConfig; + private final StorageManagerUtil storageManagerUtil; + private final BlobStoreUtil blobStoreUtil; + private final File loggedBaseDir; + private final File nonLoggedBaseDir; + private final String taskName; + private final List<String> storesToRestore; + + private final BlobStoreRestoreManagerMetrics metrics; + + /** + * Map of store name and Pair of blob id of SnapshotIndex and the corresponding SnapshotIndex from last snapshot + * creation + */ + private Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes; + + public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecutor, + BlobStoreRestoreManagerMetrics metrics, Config config, StorageManagerUtil storageManagerUtil, + BlobStoreUtil blobStoreUtil, File loggedBaseDir, File nonLoggedBaseDir) { + this.taskModel = taskModel; + this.jobName = new JobConfig(config).getName().get(); + this.jobId = new JobConfig(config).getJobId(); + this.executor = restoreExecutor; // TODO BLOCKER dchen1 dont block on restore executor + this.config = config; + this.storageConfig = new StorageConfig(config); + this.storageManagerUtil = storageManagerUtil; + this.blobStoreUtil = blobStoreUtil; + this.prevStoreSnapshotIndexes = new HashMap<>(); + this.loggedBaseDir = loggedBaseDir; + this.nonLoggedBaseDir = nonLoggedBaseDir; + this.taskName = taskModel.getTaskName().getTaskName(); + this.storesToRestore = + storageConfig.getStoresWithStateBackendRestoreFactory(BlobStoreStateBackendFactory.class.getName()); + this.metrics = metrics; + } + + @Override + public void init(Checkpoint checkpoint) { + long startTime = System.nanoTime(); + LOG.debug("Initializing blob store restore manager for task: {}", taskName); + // get previous SCMs from checkpoint + prevStoreSnapshotIndexes = + BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, blobStoreUtil); + metrics.getSnapshotIndexNs.set(System.nanoTime() - startTime); + LOG.trace("Found previous snapshot index during blob store restore manager init for task: {} to be: {}", + taskName, prevStoreSnapshotIndexes); + + metrics.initStoreMetrics(storesToRestore); + + // Note: blocks the caller (main) thread. + deleteUnusedStoresFromBlobStore(jobName, jobId, taskName, storageConfig, prevStoreSnapshotIndexes, blobStoreUtil, executor); + metrics.initNs.set(System.nanoTime() - startTime); + } + + /** + * Restore state from checkpoints, state snapshots and changelog. + */ + @Override + public void restore() { + restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir, + storageConfig, metrics, storageManagerUtil, blobStoreUtil, executor); + } + + @Override + public void close() { + } Review comment: Addressed in second pass ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/index/SnapshotIndex.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore.index; + +import com.google.common.base.Preconditions; + +import java.util.Optional; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + + +/** + * A {@link SnapshotIndex} contains all the information necessary for recreating the local store by + * downloading its contents from the remote blob store. The {@link SnapshotIndex} is itself serialized + * and stored as a blob in the remote store, and its blob id tracked in the Task checkpoint. + */ +public class SnapshotIndex { + private static final short SCHEMA_VERSION = 1; Review comment: I should have added this to other schemas (DirIndex, FileIndex etc. as well) as well as needs to be added to serde logic. I will do that as part of a follow up update. ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore; + +import com.google.common.collect.ImmutableMap; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.SamzaException; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.checkpoint.CheckpointId; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.storage.StorageManagerUtil; +import org.apache.samza.storage.TaskBackupManager; +import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil; +import org.apache.samza.storage.blobstore.util.BlobStoreUtil; +import org.apache.samza.storage.blobstore.util.DirDiffUtil; +import org.apache.samza.util.Clock; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class BlobStoreBackupManager implements TaskBackupManager { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreBackupManager.class); + + private final JobModel jobModel; + private final ExecutorService executor; + private final String jobName; + private final String jobId; + private final ContainerModel containerModel; + private final TaskModel taskModel; + private final String taskName; + private final Config config; + private final Clock clock; + private final StorageManagerUtil storageManagerUtil; + private final List<String> storesToBackup; + private final File loggedStoreBaseDir; + private final BlobStoreUtil blobStoreUtil; + + private final BlobStoreBackupManagerMetrics metrics; + + /** + * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} from + * last successful task checkpoint or {@link #upload}. + * + * After {@link #init}, the map reflects the contents of the last completed checkpoint for the task from the previous + * deployment, if any. + * + * During regular processing, this map is updated after each successful {@link #upload} with the blob id of + * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the upload. + * + * The contents of this map are used to: + * 1. Delete any unused stores from the previous deployment in the remote store during {@link #init}. + * 2. Calculate the diff for local state between the last and the current checkpoint during {@link #upload}. + * + * Since the task commit process guarantees that the async stage of the previous commit is complete before another + * commit can start, this future is guaranteed to be complete in the call to {@link #upload} during the next commit. + * + * This field is non-final, since the future itself is replaced in its entirety after init/upload. + * The internal map contents are never directly modified (e.g. using puts). It's volatile to ensure visibility + * across threads since the map assignment may happen on a different thread than the one reading the contents. + */ + private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>> + prevStoreSnapshotIndexesFuture; + + public BlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel, + ExecutorService backupExecutor, BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config, + Clock clock, File loggedStoreBaseDir, StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil) { + this.jobModel = jobModel; + this.jobName = new JobConfig(config).getName().get(); + this.jobId = new JobConfig(config).getJobId(); + this.containerModel = containerModel; + this.taskModel = taskModel; + this.taskName = taskModel.getTaskName().getTaskName(); + this.executor = backupExecutor; + this.config = config; + this.clock = clock; + this.storageManagerUtil = storageManagerUtil; + StorageConfig storageConfig = new StorageConfig(config); + this.storesToBackup = + storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName()); + this.loggedStoreBaseDir = loggedStoreBaseDir; + this.blobStoreUtil = blobStoreUtil; + this.prevStoreSnapshotIndexesFuture = CompletableFuture.completedFuture(ImmutableMap.of()); + this.metrics = blobStoreTaskBackupMetrics; + metrics.initStoreMetrics(storesToBackup); + } + + @Override + public void init(Checkpoint checkpoint) { + long startTime = System.nanoTime(); + LOG.debug("Initializing blob store backup manager for task: {}", taskName); + + // Note: blocks the caller (main) thread. + Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes = + BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, blobStoreUtil); + this.prevStoreSnapshotIndexesFuture = + CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes)); + metrics.initNs.set(System.nanoTime() - startTime); + } + + @Override + public Map<String, String> snapshot(CheckpointId checkpointId) { + // No-op. Stores are flushed and checkpoints are created by commit manager + return Collections.emptyMap(); + } + + @Override + public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId, Map<String, String> storeSCMs) { + long uploadStartTime = System.nanoTime(); + + // reset gauges for each upload + metrics.filesToUpload.getValue().set(0L); + metrics.bytesToUpload.getValue().set(0L); + metrics.filesUploaded.getValue().set(0L); + metrics.bytesUploaded.getValue().set(0L); + metrics.filesRemaining.getValue().set(0L); + metrics.bytesRemaining.getValue().set(0L); + metrics.filesToRetain.getValue().set(0L); + metrics.bytesToRetain.getValue().set(0L); + + Map<String, CompletableFuture<Pair<String, SnapshotIndex>>> + storeToSCMAndSnapshotIndexPairFutures = new HashMap<>(); + Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new HashMap<>(); + + storesToBackup.forEach((storeName) -> { + long storeUploadStartTime = System.nanoTime(); + try { + // metadata for the current store snapshot to upload + SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId, jobName, jobId, taskName, storeName); + + // Only durable/persistent stores are passed here from commit manager + // get the local store dir corresponding to the current checkpointId + File storeDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir, storeName, + taskModel.getTaskName(), taskModel.getTaskMode()); + String checkpointDirPath = storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId); + File checkpointDir = new File(checkpointDirPath); + + LOG.debug("Got task: {} store: {} storeDir: {} and checkpointDir: {}", + taskName, storeName, storeDir, checkpointDir); + + // get the previous store directory contents + DirIndex prevDirIndex; + + // guaranteed to be available since a new task commit may not start until the previous one is complete + Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes = + prevStoreSnapshotIndexesFuture.get(0, TimeUnit.MILLISECONDS); + + if (prevStoreSnapshotIndexes.containsKey(storeName)) { + prevDirIndex = prevStoreSnapshotIndexes.get(storeName).getRight().getDirIndex(); + } else { + // no previous SnapshotIndex means that this is the first commit for this store. Create an empty DirIndex. + prevDirIndex = new DirIndex(checkpointDir.getName(), Collections.emptyList(), Collections.emptyList(), + Collections.emptyList(), Collections.emptyList()); + } + + long dirDiffStartTime = System.nanoTime(); + // get the diff between previous and current store directories + DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, BlobStoreUtil.areSameFile(false)); + metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() - dirDiffStartTime); + + DirDiff.Stats stats = DirDiff.getStats(dirDiff); + updateStoreDiffMetrics(storeName, stats); + metrics.filesToUpload.getValue().addAndGet(stats.filesAdded); + metrics.bytesToUpload.getValue().addAndGet(stats.bytesAdded); + metrics.filesRemaining.getValue().addAndGet(stats.filesAdded); + metrics.bytesRemaining.getValue().addAndGet(stats.bytesAdded); + metrics.filesToRetain.getValue().addAndGet(stats.filesRetained); + metrics.bytesToRetain.getValue().addAndGet(stats.bytesRetained); + + // upload the diff to the blob store and get the new directory index + CompletionStage<DirIndex> dirIndexFuture = blobStoreUtil.putDir(dirDiff, snapshotMetadata); + + CompletionStage<SnapshotIndex> snapshotIndexFuture = + dirIndexFuture.thenApplyAsync(dirIndex -> { + LOG.trace("Dir upload complete. Returning new SnapshotIndex for task: {} store: {}.", taskName, storeName); + Optional<String> prevSnapshotIndexBlobId = + Optional.ofNullable(prevStoreSnapshotIndexes.get(storeName)) + .map(Pair::getLeft); + return new SnapshotIndex(clock.currentTimeMillis(), snapshotMetadata, dirIndex, prevSnapshotIndexBlobId); + }, executor); + + // upload the new snapshot index to the blob store and get its blob id + CompletionStage<String> snapshotIndexBlobIdFuture = + snapshotIndexFuture + .thenComposeAsync(si -> { + LOG.trace("Uploading Snapshot index for task: {} store: {}", taskName, storeName); + return blobStoreUtil.putSnapshotIndex(si); + }, executor); + + // update the temporary storeName to previous snapshot index map with the new mapping. Review comment: Updated in second pass ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java ########## @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.SamzaException; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.checkpoint.CheckpointId; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.storage.StorageManagerUtil; +import org.apache.samza.storage.TaskRestoreManager; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; +import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil; +import org.apache.samza.storage.blobstore.util.BlobStoreUtil; +import org.apache.samza.util.FileUtil; +import org.apache.samza.util.FutureUtil; +import org.checkerframework.checker.nullness.Opt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class BlobStoreRestoreManager implements TaskRestoreManager { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreRestoreManager.class); + // when checking if checkpoint dir is the same as remote snapshot, exclude the "OFFSET" family of files files + // that are written to the checkpoint dir after the remote upload is complete as part of + // TaskStorageCommitManager#writeCheckpointToStoreDirectories. + private static final Set<String> FILES_TO_IGNORE = ImmutableSet.of( + StorageManagerUtil.OFFSET_FILE_NAME_LEGACY, + StorageManagerUtil.OFFSET_FILE_NAME_NEW, + StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY, + StorageManagerUtil.CHECKPOINT_FILE_NAME); + + private final TaskModel taskModel; + private final String jobName; + private final String jobId; + private final ExecutorService executor; + private final Config config; + private final StorageConfig storageConfig; + private final StorageManagerUtil storageManagerUtil; + private final BlobStoreUtil blobStoreUtil; + private final File loggedBaseDir; + private final File nonLoggedBaseDir; + private final String taskName; + private final List<String> storesToRestore; + + private final BlobStoreRestoreManagerMetrics metrics; + + /** + * Map of store name and Pair of blob id of SnapshotIndex and the corresponding SnapshotIndex from last snapshot + * creation + */ + private Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes; + + public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecutor, + BlobStoreRestoreManagerMetrics metrics, Config config, StorageManagerUtil storageManagerUtil, + BlobStoreUtil blobStoreUtil, File loggedBaseDir, File nonLoggedBaseDir) { + this.taskModel = taskModel; + this.jobName = new JobConfig(config).getName().get(); + this.jobId = new JobConfig(config).getJobId(); + this.executor = restoreExecutor; // TODO BLOCKER dchen1 dont block on restore executor + this.config = config; + this.storageConfig = new StorageConfig(config); + this.storageManagerUtil = storageManagerUtil; + this.blobStoreUtil = blobStoreUtil; + this.prevStoreSnapshotIndexes = new HashMap<>(); + this.loggedBaseDir = loggedBaseDir; + this.nonLoggedBaseDir = nonLoggedBaseDir; + this.taskName = taskModel.getTaskName().getTaskName(); + this.storesToRestore = + storageConfig.getStoresWithStateBackendRestoreFactory(BlobStoreStateBackendFactory.class.getName()); + this.metrics = metrics; + } + + @Override + public void init(Checkpoint checkpoint) { + long startTime = System.nanoTime(); + LOG.debug("Initializing blob store restore manager for task: {}", taskName); + // get previous SCMs from checkpoint + prevStoreSnapshotIndexes = + BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, blobStoreUtil); + metrics.getSnapshotIndexNs.set(System.nanoTime() - startTime); + LOG.trace("Found previous snapshot index during blob store restore manager init for task: {} to be: {}", + taskName, prevStoreSnapshotIndexes); + + metrics.initStoreMetrics(storesToRestore); + + // Note: blocks the caller (main) thread. + deleteUnusedStoresFromBlobStore(jobName, jobId, taskName, storageConfig, prevStoreSnapshotIndexes, blobStoreUtil, executor); + metrics.initNs.set(System.nanoTime() - startTime); + } + + /** + * Restore state from checkpoints, state snapshots and changelog. + */ + @Override + public void restore() { + restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir, + storageConfig, metrics, storageManagerUtil, blobStoreUtil, executor); + } + + @Override + public void close() { + } + + /** + * Deletes blob store contents for stores that were present in the last checkpoint but are either no longer + * present in job configs (removed by user since last deploymetn) or are no longer configured to be backed + * up using blob stores. + * + * This method blocks until all the necessary store contents and snapshot index blobs have been marked for deletion. + */ + @VisibleForTesting + static void deleteUnusedStoresFromBlobStore(String jobName, String jobId, String taskName, StorageConfig storageConfig, + Map<String, Pair<String, SnapshotIndex>> initialStoreSnapshotIndexes, + BlobStoreUtil blobStoreUtil, ExecutorService executor) { + + List<String> storesToBackup = + storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName()); + List<String> storesToRestore = + storageConfig.getStoresWithStateBackendRestoreFactory(BlobStoreStateBackendFactory.class.getName()); + + List<CompletionStage<Void>> storeDeletionFutures = new ArrayList<>(); + initialStoreSnapshotIndexes.forEach((storeName, scmAndSnapshotIndex) -> { + if (!storesToBackup.contains(storeName) && !storesToRestore.contains(storeName)) { + LOG.debug("Removing task: {} store: {} from blob store. It is either no longer used, " + + "or is no longer configured to be backed up or restored with blob store.", taskName, storeName); + DirIndex dirIndex = scmAndSnapshotIndex.getRight().getDirIndex(); + Metadata requestMetadata = + new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX, Optional.empty(), jobName, jobId, taskName, storeName); + CompletionStage<Void> storeDeletionFuture = + blobStoreUtil.cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs previously marked for removal + .thenComposeAsync(v -> + blobStoreUtil.deleteDir(dirIndex, requestMetadata), executor) // deleted files and dirs still present + .thenComposeAsync(v -> blobStoreUtil.deleteSnapshotIndexBlob( + scmAndSnapshotIndex.getLeft(), requestMetadata), + executor); // delete the snapshot index blob + storeDeletionFutures.add(storeDeletionFuture); + } + }); + + FutureUtil.allOf(storeDeletionFutures).join(); + } + + /** + * Restores all eligible stores in the task. + */ + @VisibleForTesting + static void restoreStores(String jobName, String jobId, TaskName taskName, List<String> storesToRestore, + Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes, + File loggedBaseDir, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics metrics, + StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, + ExecutorService executor) { + long restoreStartTime = System.nanoTime(); + List<CompletionStage<Void>> restoreFutures = new ArrayList<>(); + + LOG.debug("Starting restore for task: {} stores: {}", taskName, storesToRestore); + storesToRestore.forEach(storeName -> { + if (!prevStoreSnapshotIndexes.containsKey(storeName)) { + LOG.debug("No checkpointed snapshot index found for task: {} store: {}. Skipping restore.", taskName, storeName); + // TODO HIGH shesharm what should we do with the local state already present on disk, if any? + // E.g. this will be the case if user changes a store from changelog based backup and restore to + // blob store based backup and restore, both at the same time. + return; + } + + Pair<String, SnapshotIndex> scmAndSnapshotIndex = prevStoreSnapshotIndexes.get(storeName); + + long storeRestoreStartTime = System.nanoTime(); + SnapshotIndex snapshotIndex = scmAndSnapshotIndex.getRight(); + DirIndex dirIndex = snapshotIndex.getDirIndex(); + + // TODO MINOR shesharm: calculate recursively similar to DirDiff.Stats + long bytesToRestore = dirIndex.getFilesPresent().stream().mapToLong(fi -> fi.getFileMetadata().getSize()).sum(); + metrics.filesToRestore.getValue().addAndGet(dirIndex.getFilesPresent().size()); + metrics.bytesToRestore.getValue().addAndGet(bytesToRestore); + metrics.filesRemaining.getValue().addAndGet(dirIndex.getFilesPresent().size()); + metrics.bytesRemaining.getValue().addAndGet(bytesToRestore); + + CheckpointId checkpointId = snapshotIndex.getSnapshotMetadata().getCheckpointId(); + File storeDir = storageManagerUtil.getTaskStoreDir(loggedBaseDir, storeName, taskName, TaskMode.Active); + Path storeCheckpointDir = Paths.get(storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId)); + LOG.trace("Got task: {} store: {} local store directory: {} and local store checkpoint directory: {}", + taskName, storeName, storeDir, storeCheckpointDir); + + // we always delete the store dir to preserve transactional state guarantees. + try { + LOG.debug("Deleting local store directory: {}. Will be restored from local store checkpoint directory " + + "or remote snapshot.", storeDir); + FileUtils.deleteDirectory(storeDir); + } catch (IOException e) { + throw new SamzaException(String.format("Error deleting store directory: %s", storeDir), e); + } + + boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName, dirIndex, + storeCheckpointDir, storageConfig, blobStoreUtil); + + if (shouldRestore) { // restore the store from the remote blob store + LOG.debug("Deleting local store checkpoint directory: {} before restore.", storeCheckpointDir); + // delete all store checkpoint directories. if we only delete the store directory and don't + // delete the checkpoint directories, the store size on disk will grow to 2x after restore + // until the first commit is completed and older checkpoint dirs are deleted. This is + // because the hard-linked checkpoint dir files will no longer be de-duped with the + // now-deleted main store directory contents and will take up additional space of their + // own during the restore. + deleteCheckpointDirs(taskName, storeName, loggedBaseDir, storageManagerUtil); + + enqueueRestore(jobName, jobId, taskName.toString(), storeName, storeDir, dirIndex, storeRestoreStartTime, + restoreFutures, blobStoreUtil, metrics, executor); + } else { + LOG.debug("Renaming store checkpoint directory: {} to store directory: {} since its contents are identical " + + "to the remote snapshot.", storeCheckpointDir, storeDir); + // atomically rename the checkpoint dir to the store dir + new FileUtil().move(storeCheckpointDir.toFile(), storeDir); + + // delete any other checkpoint dirs. + deleteCheckpointDirs(taskName, storeName, loggedBaseDir, storageManagerUtil); + } + }); + + // wait for all restores to finish + FutureUtil.allOf(restoreFutures).whenComplete((res, ex) -> { + LOG.info("Restore completed for task: {} stores", taskName); + metrics.restoreNs.set(System.nanoTime() - restoreStartTime); + }).join(); // TODO BLOCKER dchen1 make non-blocking. + } + + /** + * Determines if the store needs to be restored from remote snapshot based on local and remote state. + */ + @VisibleForTesting + static boolean shouldRestore(String taskName, String storeName, DirIndex dirIndex, + Path storeCheckpointDir, StorageConfig storageConfig, BlobStoreUtil blobStoreUtil) { + // if a store checkpoint directory exists for the last successful task checkpoint, try to use it. + boolean restoreStore; + if (Files.exists(storeCheckpointDir)) { + if (storageConfig.getCleanLoggedStoreDirsOnStart(storeName)) { Review comment: The method name seems right to me. That method just gets the config associated with 'clean logged store dirs on start' boolean. Did you mean something else? ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore; + +import com.google.common.collect.ImmutableMap; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.SamzaException; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.checkpoint.CheckpointId; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.storage.StorageManagerUtil; +import org.apache.samza.storage.TaskBackupManager; +import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil; +import org.apache.samza.storage.blobstore.util.BlobStoreUtil; +import org.apache.samza.storage.blobstore.util.DirDiffUtil; +import org.apache.samza.util.Clock; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class BlobStoreBackupManager implements TaskBackupManager { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreBackupManager.class); + + private final JobModel jobModel; + private final ExecutorService executor; + private final String jobName; + private final String jobId; + private final ContainerModel containerModel; + private final TaskModel taskModel; + private final String taskName; + private final Config config; + private final Clock clock; + private final StorageManagerUtil storageManagerUtil; + private final List<String> storesToBackup; + private final File loggedStoreBaseDir; + private final BlobStoreUtil blobStoreUtil; + + private final BlobStoreBackupManagerMetrics metrics; + + /** + * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} from + * last successful task checkpoint or {@link #upload}. + * + * After {@link #init}, the map reflects the contents of the last completed checkpoint for the task from the previous + * deployment, if any. + * + * During regular processing, this map is updated after each successful {@link #upload} with the blob id of + * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the upload. + * + * The contents of this map are used to: + * 1. Delete any unused stores from the previous deployment in the remote store during {@link #init}. + * 2. Calculate the diff for local state between the last and the current checkpoint during {@link #upload}. + * + * Since the task commit process guarantees that the async stage of the previous commit is complete before another + * commit can start, this future is guaranteed to be complete in the call to {@link #upload} during the next commit. + * + * This field is non-final, since the future itself is replaced in its entirety after init/upload. + * The internal map contents are never directly modified (e.g. using puts). It's volatile to ensure visibility + * across threads since the map assignment may happen on a different thread than the one reading the contents. + */ + private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>> + prevStoreSnapshotIndexesFuture; + + public BlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel, + ExecutorService backupExecutor, BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config, + Clock clock, File loggedStoreBaseDir, StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil) { + this.jobModel = jobModel; + this.jobName = new JobConfig(config).getName().get(); + this.jobId = new JobConfig(config).getJobId(); + this.containerModel = containerModel; + this.taskModel = taskModel; + this.taskName = taskModel.getTaskName().getTaskName(); + this.executor = backupExecutor; + this.config = config; + this.clock = clock; + this.storageManagerUtil = storageManagerUtil; + StorageConfig storageConfig = new StorageConfig(config); + this.storesToBackup = + storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName()); + this.loggedStoreBaseDir = loggedStoreBaseDir; + this.blobStoreUtil = blobStoreUtil; + this.prevStoreSnapshotIndexesFuture = CompletableFuture.completedFuture(ImmutableMap.of()); + this.metrics = blobStoreTaskBackupMetrics; + metrics.initStoreMetrics(storesToBackup); + } + + @Override + public void init(Checkpoint checkpoint) { + long startTime = System.nanoTime(); + LOG.debug("Initializing blob store backup manager for task: {}", taskName); + + // Note: blocks the caller (main) thread. + Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes = + BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, blobStoreUtil); + this.prevStoreSnapshotIndexesFuture = + CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes)); + metrics.initNs.set(System.nanoTime() - startTime); + } + + @Override + public Map<String, String> snapshot(CheckpointId checkpointId) { + // No-op. Stores are flushed and checkpoints are created by commit manager + return Collections.emptyMap(); + } + + @Override + public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId, Map<String, String> storeSCMs) { + long uploadStartTime = System.nanoTime(); + + // reset gauges for each upload + metrics.filesToUpload.getValue().set(0L); + metrics.bytesToUpload.getValue().set(0L); + metrics.filesUploaded.getValue().set(0L); + metrics.bytesUploaded.getValue().set(0L); + metrics.filesRemaining.getValue().set(0L); + metrics.bytesRemaining.getValue().set(0L); + metrics.filesToRetain.getValue().set(0L); + metrics.bytesToRetain.getValue().set(0L); + + Map<String, CompletableFuture<Pair<String, SnapshotIndex>>> + storeToSCMAndSnapshotIndexPairFutures = new HashMap<>(); + Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new HashMap<>(); + + storesToBackup.forEach((storeName) -> { + long storeUploadStartTime = System.nanoTime(); + try { + // metadata for the current store snapshot to upload + SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId, jobName, jobId, taskName, storeName); + + // Only durable/persistent stores are passed here from commit manager + // get the local store dir corresponding to the current checkpointId + File storeDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir, storeName, + taskModel.getTaskName(), taskModel.getTaskMode()); + String checkpointDirPath = storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId); + File checkpointDir = new File(checkpointDirPath); + + LOG.debug("Got task: {} store: {} storeDir: {} and checkpointDir: {}", + taskName, storeName, storeDir, checkpointDir); + + // get the previous store directory contents + DirIndex prevDirIndex; + + // guaranteed to be available since a new task commit may not start until the previous one is complete + Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes = + prevStoreSnapshotIndexesFuture.get(0, TimeUnit.MILLISECONDS); + + if (prevStoreSnapshotIndexes.containsKey(storeName)) { + prevDirIndex = prevStoreSnapshotIndexes.get(storeName).getRight().getDirIndex(); + } else { + // no previous SnapshotIndex means that this is the first commit for this store. Create an empty DirIndex. + prevDirIndex = new DirIndex(checkpointDir.getName(), Collections.emptyList(), Collections.emptyList(), + Collections.emptyList(), Collections.emptyList()); + } + + long dirDiffStartTime = System.nanoTime(); + // get the diff between previous and current store directories + DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, BlobStoreUtil.areSameFile(false)); + metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() - dirDiffStartTime); + + DirDiff.Stats stats = DirDiff.getStats(dirDiff); + updateStoreDiffMetrics(storeName, stats); + metrics.filesToUpload.getValue().addAndGet(stats.filesAdded); + metrics.bytesToUpload.getValue().addAndGet(stats.bytesAdded); + metrics.filesRemaining.getValue().addAndGet(stats.filesAdded); + metrics.bytesRemaining.getValue().addAndGet(stats.bytesAdded); + metrics.filesToRetain.getValue().addAndGet(stats.filesRetained); + metrics.bytesToRetain.getValue().addAndGet(stats.bytesRetained); + + // upload the diff to the blob store and get the new directory index + CompletionStage<DirIndex> dirIndexFuture = blobStoreUtil.putDir(dirDiff, snapshotMetadata); + + CompletionStage<SnapshotIndex> snapshotIndexFuture = + dirIndexFuture.thenApplyAsync(dirIndex -> { + LOG.trace("Dir upload complete. Returning new SnapshotIndex for task: {} store: {}.", taskName, storeName); + Optional<String> prevSnapshotIndexBlobId = + Optional.ofNullable(prevStoreSnapshotIndexes.get(storeName)) + .map(Pair::getLeft); + return new SnapshotIndex(clock.currentTimeMillis(), snapshotMetadata, dirIndex, prevSnapshotIndexBlobId); + }, executor); + + // upload the new snapshot index to the blob store and get its blob id + CompletionStage<String> snapshotIndexBlobIdFuture = + snapshotIndexFuture + .thenComposeAsync(si -> { + LOG.trace("Uploading Snapshot index for task: {} store: {}", taskName, storeName); + return blobStoreUtil.putSnapshotIndex(si); + }, executor); + + // update the temporary storeName to previous snapshot index map with the new mapping. + CompletableFuture<Pair<String, SnapshotIndex>> scmAndSnapshotIndexPairFuture = + FutureUtil.toFutureOfPair( + Pair.of(snapshotIndexBlobIdFuture.toCompletableFuture(), snapshotIndexFuture.toCompletableFuture())); + + scmAndSnapshotIndexPairFuture.whenComplete((res, ex) -> { + long uploadTimeNs = System.nanoTime() - storeUploadStartTime; + metrics.storeUploadNs.get(storeName).update(uploadTimeNs); + }); + + storeToSCMAndSnapshotIndexPairFutures.put(storeName, scmAndSnapshotIndexPairFuture); + storeToSerializedSCMFuture.put(storeName, snapshotIndexBlobIdFuture.toCompletableFuture()); + } catch (Exception e) { + throw new SamzaException( + String.format("Error uploading store snapshot to blob store for task: %s, store: %s, checkpointId: %s", + taskName, storeName, checkpointId), e); + } + }); + + // replace the previous storeName to snapshot index mapping with the new mapping. + this.prevStoreSnapshotIndexesFuture = + FutureUtil.toFutureOfMap(storeToSCMAndSnapshotIndexPairFutures); + + return FutureUtil.toFutureOfMap(storeToSerializedSCMFuture) + .whenComplete((res, ex) -> metrics.uploadNs.update(System.nanoTime() - uploadStartTime)); + } + + /** + * Clean up would be called at the end of every commit as well as on a container start/restart. + * Clean up involves the following steps: + * 1. Remove TTL of the snapshot index blob and for any associated files and sub-dirs marked for retention. + * 2. Delete the files/subdirs marked for deletion in the snapshot index. + * 3. Delete the remote {@link SnapshotIndex} blob for the previous checkpoint. + * @param checkpointId the {@link CheckpointId} of the last successfully committed checkpoint. + * @param storeSCMs store name to state checkpoint markers for the last successfully committed checkpoint + */ + @Override + public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, String> storeSCMs) { + long startTime = System.nanoTime(); + List<CompletionStage<Void>> removeTTLFutures = new ArrayList<>(); + List<CompletionStage<Void>> cleanupRemoteSnapshotFutures = new ArrayList<>(); + List<CompletionStage<Void>> removePrevRemoteSnapshotFutures = new ArrayList<>(); + + // SCM, in case of blob store backup and restore, is just the blob id of SnapshotIndex representing the remote snapshot + storeSCMs.forEach((storeName, snapshotIndexBlobId) -> { + Metadata requestMetadata = + new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX, Optional.empty(), jobName, jobId, taskName, storeName); + CompletionStage<SnapshotIndex> snapshotIndexFuture = + blobStoreUtil.getSnapshotIndex(snapshotIndexBlobId, requestMetadata); + + // 1. remove TTL of index blob and all of its files and sub-dirs marked for retention + CompletionStage<Void> removeTTLFuture = + snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { + LOG.debug("Removing TTL for index blob: {} for task: {} store :{}", + snapshotIndexBlobId, taskName, storeName); + return blobStoreUtil.removeTTL(snapshotIndexBlobId, snapshotIndex, requestMetadata); + }, executor); + removeTTLFutures.add(removeTTLFuture); + + // 2. delete the files/subdirs marked for deletion in the snapshot index. + CompletionStage<Void> cleanupRemoteSnapshotFuture = + snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { + LOG.debug("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}", + snapshotIndexBlobId, taskName, storeName); + return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), requestMetadata); + }, executor); + + cleanupRemoteSnapshotFutures.add(cleanupRemoteSnapshotFuture); + + // 3. delete the remote {@link SnapshotIndex} blob for the previous checkpoint. + CompletionStage<Void> removePrevRemoteSnapshotFuture = + snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { + if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) { + String blobId = snapshotIndex.getPrevSnapshotIndexBlobId().get(); + LOG.debug("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.", + blobId, taskName, storeName); + return blobStoreUtil.deleteSnapshotIndexBlob(blobId, requestMetadata); + } else { + // complete future immediately. There are no previous snapshots index blobs to delete. + return CompletableFuture.completedFuture(null); + } + }, executor); + removePrevRemoteSnapshotFutures.add(removePrevRemoteSnapshotFuture); + }); + + return FutureUtil.allOf(removeTTLFutures, cleanupRemoteSnapshotFutures, removePrevRemoteSnapshotFutures) + .whenComplete((res, ex) -> metrics.cleanupNs.update(System.nanoTime() - startTime)); + } + + @Override + public void close() { + // TODO need to init and close blob store manager instances? Review comment: Addressed in second pass -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
