prateekm commented on a change in pull request #1501:
URL: https://github.com/apache/samza/pull/1501#discussion_r636537837
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -719,7 +745,11 @@ class SamzaContainer(
private val jobConfig = new JobConfig(config)
private val taskConfig = new TaskConfig(config)
- val shutdownMs: Long = taskConfig.getShutdownMs
+
+ // Linkedin-specific shutdownMs due to SAMZA-2198; switch back to
TaskConfig.getShutdownMs once that is fixed
+ val shutdownMs: Long = taskConfig.getLong(TaskConfig.TASK_SHUTDOWN_MS, 5000)
Review comment:
Does this need to be here?
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+ private final JobModel jobModel;
+ private final ExecutorService executor;
+ private final String jobName;
+ private final String jobId;
+ private final ContainerModel containerModel;
+ private final TaskModel taskModel;
+ private final String taskName;
+ private final Config config;
+ private final Clock clock;
+ private final StorageManagerUtil storageManagerUtil;
+ private final List<String> storesToBackup;
+ private final File loggedStoreBaseDir;
+ private final BlobStoreUtil blobStoreUtil;
+
+ private final BlobStoreBackupManagerMetrics metrics;
+
+ /**
+ * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the
corresponding {@link SnapshotIndex} from
+ * last successful task checkpoint or {@link #upload}.
+ *
+ * After {@link #init}, the map reflects the contents of the last completed
checkpoint for the task from the previous
+ * deployment, if any.
+ *
+ * During regular processing, this map is updated after each successful
{@link #upload} with the blob id of
+ * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the
upload.
+ *
+ * The contents of this map are used to:
+ * 1. Delete any unused stores from the previous deployment in the remote
store during {@link #init}.
+ * 2. Calculate the diff for local state between the last and the current
checkpoint during {@link #upload}.
+ *
+ * Since the task commit process guarantees that the async stage of the
previous commit is complete before another
+ * commit can start, this future is guaranteed to be complete in the call to
{@link #upload} during the next commit.
+ *
+ * This field is non-final, since the future itself is replaced in its
entirety after init/upload.
+ * The internal map contents are never directly modified (e.g. using puts).
It's volatile to ensure visibility
+ * across threads since the map assignment may happen on a different thread
than the one reading the contents.
+ */
+ private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+ prevStoreSnapshotIndexesFuture;
+
+ public BlobStoreBackupManager(JobModel jobModel, ContainerModel
containerModel, TaskModel taskModel,
+ ExecutorService backupExecutor, BlobStoreBackupManagerMetrics
blobStoreTaskBackupMetrics, Config config,
+ Clock clock, File loggedStoreBaseDir, StorageManagerUtil
storageManagerUtil, BlobStoreUtil blobStoreUtil) {
+ this.jobModel = jobModel;
+ this.jobName = new JobConfig(config).getName().get();
+ this.jobId = new JobConfig(config).getJobId();
+ this.containerModel = containerModel;
+ this.taskModel = taskModel;
+ this.taskName = taskModel.getTaskName().getTaskName();
+ this.executor = backupExecutor;
+ this.config = config;
+ this.clock = clock;
+ this.storageManagerUtil = storageManagerUtil;
+ StorageConfig storageConfig = new StorageConfig(config);
+ this.storesToBackup =
+
storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+ this.loggedStoreBaseDir = loggedStoreBaseDir;
+ this.blobStoreUtil = blobStoreUtil;
+ this.prevStoreSnapshotIndexesFuture =
CompletableFuture.completedFuture(ImmutableMap.of());
+ this.metrics = blobStoreTaskBackupMetrics;
+ metrics.initStoreMetrics(storesToBackup);
+ }
+
+ @Override
+ public void init(Checkpoint checkpoint) {
+ long startTime = System.nanoTime();
+ LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+
+ // Note: blocks the caller (main) thread.
+ Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+ BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId,
taskName, checkpoint, blobStoreUtil);
+ this.prevStoreSnapshotIndexesFuture =
+
CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
+ metrics.initNs.set(System.nanoTime() - startTime);
+ }
+
+ @Override
+ public Map<String, String> snapshot(CheckpointId checkpointId) {
+ // No-op. Stores are flushed and checkpoints are created by commit manager
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public CompletableFuture<Map<String, String>> upload(CheckpointId
checkpointId, Map<String, String> storeSCMs) {
+ long uploadStartTime = System.nanoTime();
+
+ // reset gauges for each upload
+ metrics.filesToUpload.getValue().set(0L);
+ metrics.bytesToUpload.getValue().set(0L);
+ metrics.filesUploaded.getValue().set(0L);
+ metrics.bytesUploaded.getValue().set(0L);
+ metrics.filesRemaining.getValue().set(0L);
+ metrics.bytesRemaining.getValue().set(0L);
+ metrics.filesToRetain.getValue().set(0L);
+ metrics.bytesToRetain.getValue().set(0L);
+
+ Map<String, CompletableFuture<Pair<String, SnapshotIndex>>>
+ storeToSCMAndSnapshotIndexPairFutures = new HashMap<>();
+ Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new
HashMap<>();
+
+ storesToBackup.forEach((storeName) -> {
+ long storeUploadStartTime = System.nanoTime();
+ try {
+ // metadata for the current store snapshot to upload
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId,
jobName, jobId, taskName, storeName);
+
+ // Only durable/persistent stores are passed here from commit manager
+ // get the local store dir corresponding to the current checkpointId
+ File storeDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir,
storeName,
+ taskModel.getTaskName(), taskModel.getTaskMode());
+ String checkpointDirPath =
storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId);
+ File checkpointDir = new File(checkpointDirPath);
+
+ LOG.debug("Got task: {} store: {} storeDir: {} and checkpointDir: {}",
+ taskName, storeName, storeDir, checkpointDir);
+
+ // get the previous store directory contents
+ DirIndex prevDirIndex;
+
+ // guaranteed to be available since a new task commit may not start
until the previous one is complete
+ Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+ prevStoreSnapshotIndexesFuture.get(0, TimeUnit.MILLISECONDS);
+
+ if (prevStoreSnapshotIndexes.containsKey(storeName)) {
+ prevDirIndex =
prevStoreSnapshotIndexes.get(storeName).getRight().getDirIndex();
+ } else {
+ // no previous SnapshotIndex means that this is the first commit for
this store. Create an empty DirIndex.
+ prevDirIndex = new DirIndex(checkpointDir.getName(),
Collections.emptyList(), Collections.emptyList(),
+ Collections.emptyList(), Collections.emptyList());
+ }
+
+ long dirDiffStartTime = System.nanoTime();
+ // get the diff between previous and current store directories
+ DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex,
BlobStoreUtil.areSameFile(false));
+ metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() -
dirDiffStartTime);
+
+ DirDiff.Stats stats = DirDiff.getStats(dirDiff);
+ updateStoreDiffMetrics(storeName, stats);
+ metrics.filesToUpload.getValue().addAndGet(stats.filesAdded);
+ metrics.bytesToUpload.getValue().addAndGet(stats.bytesAdded);
+ metrics.filesRemaining.getValue().addAndGet(stats.filesAdded);
+ metrics.bytesRemaining.getValue().addAndGet(stats.bytesAdded);
+ metrics.filesToRetain.getValue().addAndGet(stats.filesRetained);
+ metrics.bytesToRetain.getValue().addAndGet(stats.bytesRetained);
+
+ // upload the diff to the blob store and get the new directory index
+ CompletionStage<DirIndex> dirIndexFuture =
blobStoreUtil.putDir(dirDiff, snapshotMetadata);
+
+ CompletionStage<SnapshotIndex> snapshotIndexFuture =
+ dirIndexFuture.thenApplyAsync(dirIndex -> {
+ LOG.trace("Dir upload complete. Returning new SnapshotIndex for
task: {} store: {}.", taskName, storeName);
+ Optional<String> prevSnapshotIndexBlobId =
+ Optional.ofNullable(prevStoreSnapshotIndexes.get(storeName))
+ .map(Pair::getLeft);
+ return new SnapshotIndex(clock.currentTimeMillis(),
snapshotMetadata, dirIndex, prevSnapshotIndexBlobId);
+ }, executor);
+
+ // upload the new snapshot index to the blob store and get its blob id
+ CompletionStage<String> snapshotIndexBlobIdFuture =
+ snapshotIndexFuture
+ .thenComposeAsync(si -> {
+ LOG.trace("Uploading Snapshot index for task: {} store: {}",
taskName, storeName);
+ return blobStoreUtil.putSnapshotIndex(si);
+ }, executor);
+
+ // update the temporary storeName to previous snapshot index map with
the new mapping.
+ CompletableFuture<Pair<String, SnapshotIndex>>
scmAndSnapshotIndexPairFuture =
+ FutureUtil.toFutureOfPair(
+ Pair.of(snapshotIndexBlobIdFuture.toCompletableFuture(),
snapshotIndexFuture.toCompletableFuture()));
+
+ scmAndSnapshotIndexPairFuture.whenComplete((res, ex) -> {
+ long uploadTimeNs = System.nanoTime() - storeUploadStartTime;
+ metrics.storeUploadNs.get(storeName).update(uploadTimeNs);
+ });
+
+ storeToSCMAndSnapshotIndexPairFutures.put(storeName,
scmAndSnapshotIndexPairFuture);
+ storeToSerializedSCMFuture.put(storeName,
snapshotIndexBlobIdFuture.toCompletableFuture());
+ } catch (Exception e) {
+ throw new SamzaException(
+ String.format("Error uploading store snapshot to blob store for
task: %s, store: %s, checkpointId: %s",
+ taskName, storeName, checkpointId), e);
+ }
+ });
+
+ // replace the previous storeName to snapshot index mapping with the new
mapping.
+ this.prevStoreSnapshotIndexesFuture =
+ FutureUtil.toFutureOfMap(storeToSCMAndSnapshotIndexPairFutures);
+
+ return FutureUtil.toFutureOfMap(storeToSerializedSCMFuture)
+ .whenComplete((res, ex) -> metrics.uploadNs.update(System.nanoTime() -
uploadStartTime));
+ }
+
+ /**
+ * Clean up would be called at the end of every commit as well as on a
container start/restart.
+ * Clean up involves the following steps:
+ * 1. Remove TTL of the snapshot index blob and for any associated files and
sub-dirs marked for retention.
+ * 2. Delete the files/subdirs marked for deletion in the snapshot index.
+ * 3. Delete the remote {@link SnapshotIndex} blob for the previous
checkpoint.
+ * @param checkpointId the {@link CheckpointId} of the last successfully
committed checkpoint.
+ * @param storeSCMs store name to state checkpoint markers for the last
successfully committed checkpoint
+ */
+ @Override
+ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId,
Map<String, String> storeSCMs) {
+ long startTime = System.nanoTime();
+ List<CompletionStage<Void>> removeTTLFutures = new ArrayList<>();
+ List<CompletionStage<Void>> cleanupRemoteSnapshotFutures = new
ArrayList<>();
+ List<CompletionStage<Void>> removePrevRemoteSnapshotFutures = new
ArrayList<>();
+
+ // SCM, in case of blob store backup and restore, is just the blob id of
SnapshotIndex representing the remote snapshot
+ storeSCMs.forEach((storeName, snapshotIndexBlobId) -> {
Review comment:
Can you clarify what you changed, if anything, and why? Is this an issue
or not?
Use storesToBackup map instead of creating again?
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.storage.KafkaChangelogRestoreParams;
+import org.apache.samza.storage.BlobStoreAdminFactory;
+import org.apache.samza.storage.StateBackendFactory;
+import org.apache.samza.storage.StateBackendAdmin;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.TaskRestoreManager;
+import
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import
org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ReflectionUtil;
+
+
+public class BlobStoreStateBackendFactory implements StateBackendFactory {
+ @Override
+ public TaskBackupManager getBackupManager(
+ JobModel jobModel,
+ ContainerModel containerModel,
+ TaskModel taskModel,
+ ExecutorService backupExecutor,
+ MetricsRegistry metricsRegistry,
+ Config config,
+ Clock clock,
+ File loggedStoreBaseDir,
+ File nonLoggedStoreBaseDir) {
+ StorageConfig storageConfig = new StorageConfig(config);
+ String blobStoreManagerFactory =
storageConfig.getBlobStoreManagerFactory();
+ Preconditions.checkState(StringUtils.isNotBlank(blobStoreManagerFactory));
+ BlobStoreManagerFactory factory =
ReflectionUtil.getObj(blobStoreManagerFactory, BlobStoreManagerFactory.class);
+ BlobStoreManager blobStoreManager =
factory.getBackupBlobStoreManager(config, backupExecutor);
+ BlobStoreBackupManagerMetrics metrics = new
BlobStoreBackupManagerMetrics(metricsRegistry);
+ BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager,
backupExecutor, metrics, null);
Review comment:
+1 for keeping them separate, since some of the metrics have similar
names and functionality, just in opposite directions (upload vs download).
Agree that it's not that nice for BlobStoreUtil, but I prefer this tradeoff too.
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+ private final JobModel jobModel;
+ private final ExecutorService executor;
+ private final String jobName;
+ private final String jobId;
+ private final ContainerModel containerModel;
+ private final TaskModel taskModel;
+ private final String taskName;
+ private final Config config;
+ private final Clock clock;
+ private final StorageManagerUtil storageManagerUtil;
+ private final List<String> storesToBackup;
+ private final File loggedStoreBaseDir;
+ private final BlobStoreUtil blobStoreUtil;
+
+ private final BlobStoreBackupManagerMetrics metrics;
+
+ /**
+ * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the
corresponding {@link SnapshotIndex} from
+ * last successful task checkpoint or {@link #upload}.
+ *
+ * After {@link #init}, the map reflects the contents of the last completed
checkpoint for the task from the previous
+ * deployment, if any.
+ *
+ * During regular processing, this map is updated after each successful
{@link #upload} with the blob id of
+ * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the
upload.
+ *
+ * The contents of this map are used to:
+ * 1. Delete any unused stores from the previous deployment in the remote
store during {@link #init}.
+ * 2. Calculate the diff for local state between the last and the current
checkpoint during {@link #upload}.
+ *
+ * Since the task commit process guarantees that the async stage of the
previous commit is complete before another
+ * commit can start, this future is guaranteed to be complete in the call to
{@link #upload} during the next commit.
+ *
+ * This field is non-final, since the future itself is replaced in its
entirety after init/upload.
+ * The internal map contents are never directly modified (e.g. using puts).
It's volatile to ensure visibility
+ * across threads since the map assignment may happen on a different thread
than the one reading the contents.
+ */
+ private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+ prevStoreSnapshotIndexesFuture;
+
+ public BlobStoreBackupManager(JobModel jobModel, ContainerModel
containerModel, TaskModel taskModel,
+ ExecutorService backupExecutor, BlobStoreBackupManagerMetrics
blobStoreTaskBackupMetrics, Config config,
+ Clock clock, File loggedStoreBaseDir, StorageManagerUtil
storageManagerUtil, BlobStoreUtil blobStoreUtil) {
+ this.jobModel = jobModel;
+ this.jobName = new JobConfig(config).getName().get();
+ this.jobId = new JobConfig(config).getJobId();
+ this.containerModel = containerModel;
+ this.taskModel = taskModel;
+ this.taskName = taskModel.getTaskName().getTaskName();
+ this.executor = backupExecutor;
+ this.config = config;
+ this.clock = clock;
+ this.storageManagerUtil = storageManagerUtil;
+ StorageConfig storageConfig = new StorageConfig(config);
+ this.storesToBackup =
+
storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+ this.loggedStoreBaseDir = loggedStoreBaseDir;
+ this.blobStoreUtil = blobStoreUtil;
+ this.prevStoreSnapshotIndexesFuture =
CompletableFuture.completedFuture(ImmutableMap.of());
+ this.metrics = blobStoreTaskBackupMetrics;
+ metrics.initStoreMetrics(storesToBackup);
+ }
+
+ @Override
+ public void init(Checkpoint checkpoint) {
+ long startTime = System.nanoTime();
+ LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+
+ // Note: blocks the caller (main) thread.
+ Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+ BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId,
taskName, checkpoint, blobStoreUtil);
+ this.prevStoreSnapshotIndexesFuture =
+
CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
+ metrics.initNs.set(System.nanoTime() - startTime);
+ }
+
+ @Override
+ public Map<String, String> snapshot(CheckpointId checkpointId) {
+ // No-op. Stores are flushed and checkpoints are created by commit manager
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public CompletableFuture<Map<String, String>> upload(CheckpointId
checkpointId, Map<String, String> storeSCMs) {
+ long uploadStartTime = System.nanoTime();
+
+ // reset gauges for each upload
+ metrics.filesToUpload.getValue().set(0L);
+ metrics.bytesToUpload.getValue().set(0L);
+ metrics.filesUploaded.getValue().set(0L);
+ metrics.bytesUploaded.getValue().set(0L);
+ metrics.filesRemaining.getValue().set(0L);
+ metrics.bytesRemaining.getValue().set(0L);
+ metrics.filesToRetain.getValue().set(0L);
+ metrics.bytesToRetain.getValue().set(0L);
+
+ Map<String, CompletableFuture<Pair<String, SnapshotIndex>>>
+ storeToSCMAndSnapshotIndexPairFutures = new HashMap<>();
+ Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new
HashMap<>();
+
+ storesToBackup.forEach((storeName) -> {
+ long storeUploadStartTime = System.nanoTime();
+ try {
+ // metadata for the current store snapshot to upload
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId,
jobName, jobId, taskName, storeName);
+
+ // Only durable/persistent stores are passed here from commit manager
+ // get the local store dir corresponding to the current checkpointId
+ File storeDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir,
storeName,
+ taskModel.getTaskName(), taskModel.getTaskMode());
+ String checkpointDirPath =
storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId);
+ File checkpointDir = new File(checkpointDirPath);
+
+ LOG.debug("Got task: {} store: {} storeDir: {} and checkpointDir: {}",
+ taskName, storeName, storeDir, checkpointDir);
+
+ // get the previous store directory contents
+ DirIndex prevDirIndex;
+
+ // guaranteed to be available since a new task commit may not start
until the previous one is complete
+ Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+ prevStoreSnapshotIndexesFuture.get(0, TimeUnit.MILLISECONDS);
+
+ if (prevStoreSnapshotIndexes.containsKey(storeName)) {
+ prevDirIndex =
prevStoreSnapshotIndexes.get(storeName).getRight().getDirIndex();
+ } else {
+ // no previous SnapshotIndex means that this is the first commit for
this store. Create an empty DirIndex.
+ prevDirIndex = new DirIndex(checkpointDir.getName(),
Collections.emptyList(), Collections.emptyList(),
+ Collections.emptyList(), Collections.emptyList());
+ }
+
+ long dirDiffStartTime = System.nanoTime();
+ // get the diff between previous and current store directories
+ DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex,
BlobStoreUtil.areSameFile(false));
+ metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() -
dirDiffStartTime);
+
+ DirDiff.Stats stats = DirDiff.getStats(dirDiff);
+ updateStoreDiffMetrics(storeName, stats);
+ metrics.filesToUpload.getValue().addAndGet(stats.filesAdded);
Review comment:
WDYM by moved to stats class?
##########
File path: samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java
##########
@@ -0,0 +1,81 @@
+package org.apache.samza.config;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+/**
+ * Config related helper methods for BlobStore.
+ */
+public class BlobStoreConfig extends MapConfig {
+ private static final String STORE_PREFIX = "stores.";
+ private static final String CHANGELOG_SUFFIX = ".changelog";
+
+ public static final String DEFAULT_BLOB_STORE_STATE_BACKEND_FACTORY =
+ "org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory";
+ public static final String BLOB_STORE_MANAGER_FACTORY =
"blob.store.manager.factory";
+ public static final String BLOB_STORE_STATE_BACKEND_ADMIN_FACTORY =
"blob.store.state.backend.admin.factory";
+
+ public static final String DEFAULT_STORE_STATE_BACKEND_FACTORY =
+ "org.apache.samza.storage.KafkaChangelogStateBackendFactory";
+ public static final List<String>
DEFAULT_STORE_STATE_BACKEND_BACKUP_FACTORIES = ImmutableList.of(
+ DEFAULT_STORE_STATE_BACKEND_FACTORY);
+ public static final String STORE_STATE_BACKEND_BACKUP_FACTORIES =
STORE_PREFIX + "%s.state.backend.backup.factories";
Review comment:
BLOCKER: These are not blob store manager specific, these should be in
storage config.
Only the getBlobStoreManagerFactory, getBlobStoreAdminFactory should be here.
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * Provides interface for common blob store operations: GET, PUT and DELETE
+ */
[email protected]
+public interface BlobStoreManager {
+ /**
+ * init method to initialize underlying blob store client, if necessary
+ */
+ void init();
Review comment:
Add newline between this and next method too.
##########
File path: samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java
##########
@@ -0,0 +1,81 @@
+package org.apache.samza.config;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+/**
+ * Config related helper methods for BlobStore.
+ */
+public class BlobStoreConfig extends MapConfig {
+ private static final String STORE_PREFIX = "stores.";
+ private static final String CHANGELOG_SUFFIX = ".changelog";
+
+ public static final String DEFAULT_BLOB_STORE_STATE_BACKEND_FACTORY =
Review comment:
1. Does this need to be here or can we remove and use class.getName()
instead.
2. If it does need to be here, rename field name and remove DEFAULT_. Rename
DEFAULT_STORE_STATE_BACKEND_FACTORY to KAFKA_STATE_BACKEND_FACTORY. and change
DEFAULT_STORE_BACKUP_FACTORIES to refer to these two properties instead.
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -500,8 +519,9 @@ object SamzaContainer extends Logging {
val loggedStorageBaseDir = getLoggedStorageBaseDir(jobConfig,
defaultStoreBaseDir)
info("Got base directory for logged data stores: %s" format
loggedStorageBaseDir)
+ // TODO dchen should we enforce restore factories to be subset of backup
factories?
Review comment:
TODO HIGH dchen. @dxichen let's fix this in the "config scope and
precedence" follow up for async commit.
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.BlobStoreConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+ private final JobModel jobModel;
+ private final ExecutorService executor;
+ private final String jobName;
+ private final String jobId;
+ private final ContainerModel containerModel;
+ private final TaskModel taskModel;
+ private final String taskName;
+ private final Config config;
+ private final Clock clock;
+ private final StorageManagerUtil storageManagerUtil;
+ private final List<String> storesToBackup;
+ private final File loggedStoreBaseDir;
+ private final BlobStoreUtil blobStoreUtil;
+
+ private final BlobStoreBackupManagerMetrics metrics;
+
+ /**
+ * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the
corresponding {@link SnapshotIndex} from
+ * last successful task checkpoint or {@link #upload}.
+ *
+ * After {@link #init}, the map reflects the contents of the last completed
checkpoint for the task from the previous
+ * deployment, if any.
+ *
+ * During regular processing, this map is updated after each successful
{@link #upload} with the blob id of
+ * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the
upload.
+ *
+ * The contents of this map are used to calculate the diff for local state
between the last and the current checkpoint
+ * during {@link #upload}.
+ *
+ * Since the task commit process guarantees that the async stage of the
previous commit is complete before another
+ * commit can start, this future is guaranteed to be complete in the call to
{@link #upload} during the next commit.
+ *
+ * This field is non-final, since the future itself is replaced in its
entirety after init/upload.
+ * The internal map contents are never directly modified (e.g. using puts).
It's volatile to ensure visibility
+ * across threads since the map assignment may happen on a different thread
than the one reading the contents.
+ */
+ private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+ prevStoreSnapshotIndexesFuture;
+
+ public BlobStoreBackupManager(JobModel jobModel, ContainerModel
containerModel, TaskModel taskModel,
+ ExecutorService backupExecutor, BlobStoreBackupManagerMetrics
blobStoreTaskBackupMetrics, Config config,
+ Clock clock, File loggedStoreBaseDir, StorageManagerUtil
storageManagerUtil, BlobStoreUtil blobStoreUtil) {
+ this.jobModel = jobModel;
+ this.jobName = new JobConfig(config).getName().get();
+ this.jobId = new JobConfig(config).getJobId();
+ this.containerModel = containerModel;
+ this.taskModel = taskModel;
+ this.taskName = taskModel.getTaskName().getTaskName();
+ this.executor = backupExecutor;
+ this.config = config;
+ this.clock = clock;
+ this.storageManagerUtil = storageManagerUtil;
+ BlobStoreConfig blobStoreConfig = new BlobStoreConfig(config);
+ StorageConfig storageConfig = new StorageConfig(config);
+ this.storesToBackup =
+
blobStoreConfig.getStoresWithStateBackendBackupFactory(storageConfig.getStoreNames(),
+ BlobStoreStateBackendFactory.class.getName());
+ this.loggedStoreBaseDir = loggedStoreBaseDir;
+ this.blobStoreUtil = blobStoreUtil;
+ this.prevStoreSnapshotIndexesFuture =
CompletableFuture.completedFuture(ImmutableMap.of());
+ this.metrics = blobStoreTaskBackupMetrics;
+ metrics.initStoreMetrics(storesToBackup);
+ }
+
+ @Override
+ public void init(Checkpoint checkpoint) {
+ long startTime = System.nanoTime();
+ LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+
+ blobStoreUtil.initBlobStoreManager();
+
+ // Note: blocks the caller thread.
+ Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+ blobStoreUtil.getStoreSnapshotIndexes(jobName, jobId, taskName,
checkpoint);
+ this.prevStoreSnapshotIndexesFuture =
+
CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
+ metrics.initNs.set(System.nanoTime() - startTime);
+ }
+
+ @Override
+ public Map<String, String> snapshot(CheckpointId checkpointId) {
+ // No-op. Stores are flushed and checkpoints are created by commit manager
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public CompletableFuture<Map<String, String>> upload(CheckpointId
checkpointId, Map<String, String> storeSCMs) {
+ long uploadStartTime = System.nanoTime();
+
+ // reset gauges for each upload
+ metrics.filesToUpload.getValue().set(0L);
+ metrics.bytesToUpload.getValue().set(0L);
+ metrics.filesUploaded.getValue().set(0L);
+ metrics.bytesUploaded.getValue().set(0L);
+ metrics.filesRemaining.getValue().set(0L);
+ metrics.bytesRemaining.getValue().set(0L);
+ metrics.filesToRetain.getValue().set(0L);
+ metrics.bytesToRetain.getValue().set(0L);
+
+ // This map is used to atomically replace the
prevStoreSnapshotIndexesFuture map at the end of the task commit
+ Map<String, CompletableFuture<Pair<String, SnapshotIndex>>>
+ storeToSCMAndSnapshotIndexPairFutures = new HashMap<>();
+ // This map is used to return serialized State Checkpoint Markers to the
caller
+ Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new
HashMap<>();
+
+ storesToBackup.forEach((storeName) -> {
+ long storeUploadStartTime = System.nanoTime();
+ try {
+ // metadata for the current store snapshot to upload
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId,
jobName, jobId, taskName, storeName);
+
+ // get the local store dir corresponding to the current checkpointId
+ File storeDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir,
storeName,
+ taskModel.getTaskName(), taskModel.getTaskMode());
+ String checkpointDirPath =
storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId);
+ File checkpointDir = new File(checkpointDirPath);
+
+ LOG.debug("Got task: {} store: {} storeDir: {} and checkpointDir: {}",
+ taskName, storeName, storeDir, checkpointDir);
+
+ // guaranteed to be available since a new task commit may not start
until the previous one is complete
+ Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+ prevStoreSnapshotIndexesFuture.get(0, TimeUnit.MILLISECONDS);
+
+ // get the previous store directory contents
+ DirIndex prevDirIndex;
+
+ if (prevStoreSnapshotIndexes.containsKey(storeName)) {
+ prevDirIndex =
prevStoreSnapshotIndexes.get(storeName).getRight().getDirIndex();
+ } else {
+ // no previous SnapshotIndex means that this is the first commit for
this store. Create an empty DirIndex.
+ prevDirIndex = new DirIndex(checkpointDir.getName(),
Collections.emptyList(), Collections.emptyList(),
+ Collections.emptyList(), Collections.emptyList());
+ }
+
+ long dirDiffStartTime = System.nanoTime();
+ // get the diff between previous and current store directories
+ DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex,
DirDiffUtil.areSameFile(false));
+ metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() -
dirDiffStartTime);
+
+ DirDiff.Stats stats = DirDiff.getStats(dirDiff);
+ updateStoreDiffMetrics(storeName, stats);
+ metrics.filesToUpload.getValue().addAndGet(stats.filesAdded);
+ metrics.bytesToUpload.getValue().addAndGet(stats.bytesAdded);
+ // Note: FilesRemaining metric is set to FilesAdded in the beginning
of the current upload and then counted down
+ // for each upload.
+ metrics.filesRemaining.getValue().addAndGet(stats.filesAdded);
+ metrics.bytesRemaining.getValue().addAndGet(stats.bytesAdded);
+ metrics.filesToRetain.getValue().addAndGet(stats.filesRetained);
+ metrics.bytesToRetain.getValue().addAndGet(stats.bytesRetained);
+
+ // upload the diff to the blob store and get the new directory index
+ CompletionStage<DirIndex> dirIndexFuture =
blobStoreUtil.putDir(dirDiff, snapshotMetadata);
+
+ CompletionStage<SnapshotIndex> snapshotIndexFuture =
+ dirIndexFuture.thenApplyAsync(dirIndex -> {
+ LOG.trace("Dir upload complete. Returning new SnapshotIndex for
task: {} store: {}.", taskName, storeName);
+ Optional<String> prevSnapshotIndexBlobId =
+
Optional.ofNullable(prevStoreSnapshotIndexes.get(storeName)).map(Pair::getLeft);
+ return new SnapshotIndex(clock.currentTimeMillis(),
snapshotMetadata, dirIndex, prevSnapshotIndexBlobId);
+ }, executor);
+
+ // upload the new snapshot index to the blob store and get its blob id
+ CompletionStage<String> snapshotIndexBlobIdFuture =
+ snapshotIndexFuture
+ .thenComposeAsync(si -> {
+ LOG.trace("Uploading Snapshot index for task: {} store: {}",
taskName, storeName);
+ return blobStoreUtil.putSnapshotIndex(si);
+ }, executor);
+
+ // update the map of storeName to previous snapshot index
storeToSCMAndSnapshotIndexPairFutures which is temporary
+ // map used to atomically update prevStoreSnapshotIndexesFuture at the
end of the commit with the new mapping.
+ CompletableFuture<Pair<String, SnapshotIndex>>
scmAndSnapshotIndexPairFuture =
+ FutureUtil.toFutureOfPair(
+ Pair.of(snapshotIndexBlobIdFuture.toCompletableFuture(),
snapshotIndexFuture.toCompletableFuture()));
+
+ scmAndSnapshotIndexPairFuture.whenComplete((res, ex) -> {
+ long uploadTimeNs = System.nanoTime() - storeUploadStartTime;
+ metrics.storeUploadNs.get(storeName).update(uploadTimeNs);
+ });
+
+ storeToSCMAndSnapshotIndexPairFutures.put(storeName,
scmAndSnapshotIndexPairFuture);
+ storeToSerializedSCMFuture.put(storeName,
snapshotIndexBlobIdFuture.toCompletableFuture());
+ } catch (Exception e) {
+ throw new SamzaException(
+ String.format("Error uploading store snapshot to blob store for
task: %s, store: %s, checkpointId: %s",
+ taskName, storeName, checkpointId), e);
+ }
+ });
+
+ // replace the previous storeName to snapshot index mapping with the new
mapping.
+ this.prevStoreSnapshotIndexesFuture =
+ FutureUtil.toFutureOfMap(storeToSCMAndSnapshotIndexPairFutures);
+
+ return FutureUtil.toFutureOfMap(storeToSerializedSCMFuture)
+ .whenComplete((res, ex) -> metrics.uploadNs.update(System.nanoTime() -
uploadStartTime));
+ }
+
+ /**
+ * Clean up would be called at the end of every commit as well as on a
container start/restart.
+ * Clean up involves the following steps:
+ * 1. Remove TTL of the snapshot index blob and for any associated files and
sub-dirs marked for retention.
+ * 2. Delete the files/subdirs marked for deletion in the snapshot index.
+ * 3. Delete the remote {@link SnapshotIndex} blob for the previous
checkpoint.
+ * @param checkpointId the {@link CheckpointId} of the last successfully
committed checkpoint.
+ * @param storeSCMs store name to state checkpoint markers for the last
successfully committed checkpoint
+ */
+ @Override
+ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId,
Map<String, String> storeSCMs) {
+ long startTime = System.nanoTime();
+ List<CompletionStage<Void>> removeTTLFutures = new ArrayList<>();
+ List<CompletionStage<Void>> cleanupRemoteSnapshotFutures = new
ArrayList<>();
+ List<CompletionStage<Void>> removePrevRemoteSnapshotFutures = new
ArrayList<>();
+
+ List<String> storeNames = new StorageConfig(config).getStoreNames();
+ List<String> storesWithBlobStoreStateBackend =
+ new BlobStoreConfig(config)
+ .getStoresWithStateBackendBackupFactory(storeNames,
BlobStoreStateBackendFactory.class.getName());
+
+ // SCM, in case of blob store backup and restore, is just the blob id of
SnapshotIndex representing the remote snapshot
+ storeSCMs.forEach((storeName, snapshotIndexBlobId) -> {
+ // Only perform cleanup for stores configured with BlobStore State
Backend Factory
+ if (storesWithBlobStoreStateBackend.contains(storeName)) {
+ Metadata requestMetadata =
+ new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH,
Optional.empty(), jobName, jobId, taskName, storeName);
+ CompletionStage<SnapshotIndex> snapshotIndexFuture =
+ blobStoreUtil.getSnapshotIndex(snapshotIndexBlobId,
requestMetadata);
+
+ // 1. remove TTL of index blob and all of its files and sub-dirs
marked for retention
+ CompletionStage<Void> removeTTLFuture =
+ snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
+ LOG.debug("Removing TTL for index blob: {} and all of its files
and sub-dirs for task: {} store :{}",
+ snapshotIndexBlobId, taskName, storeName);
+ return blobStoreUtil.removeTTL(snapshotIndexBlobId,
snapshotIndex, requestMetadata);
+ }, executor);
+ removeTTLFutures.add(removeTTLFuture);
+
+ // 2. delete the files/subdirs marked for deletion in the snapshot
index.
+ CompletionStage<Void> cleanupRemoteSnapshotFuture =
+ snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
+ LOG.debug("Deleting files and dirs to remove for current index
blob: {} for task: {} store: {}",
+ snapshotIndexBlobId, taskName, storeName);
+ return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(),
requestMetadata);
+ }, executor);
+
+ cleanupRemoteSnapshotFutures.add(cleanupRemoteSnapshotFuture);
+
+ // 3. delete the remote {@link SnapshotIndex} blob for the previous
checkpoint.
+ CompletionStage<Void> removePrevRemoteSnapshotFuture =
+ snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
+ if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) {
+ String blobId =
snapshotIndex.getPrevSnapshotIndexBlobId().get();
+ LOG.debug("Removing previous snapshot index blob: {} from blob
store for task: {} store: {}.",
+ blobId, taskName, storeName);
+ return blobStoreUtil.deleteSnapshotIndexBlob(blobId,
requestMetadata);
+ } else {
+ // complete future immediately. There are no previous
snapshots index blobs to delete.
+ return CompletableFuture.completedFuture(null);
+ }
+ }, executor);
+ removePrevRemoteSnapshotFutures.add(removePrevRemoteSnapshotFuture);
+ }
+ });
+
+ return FutureUtil.allOf(removeTTLFutures, cleanupRemoteSnapshotFutures,
removePrevRemoteSnapshotFutures)
+ .whenComplete((res, ex) -> metrics.cleanupNs.update(System.nanoTime()
- startTime));
+ }
+
+ @Override
+ public void close() {
+ blobStoreUtil.closeBlobStoreManager();
Review comment:
Why is BlobStoreUtil proxying the close call to the Manager?
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/blobstore/exceptions/DeletedException.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore.exceptions;
+
+public class DeletedException extends RuntimeException {
Review comment:
Follow Up: Document that blob store managers should use these exceptions
for the relevant errors, because the blob store agnostic error handling and
retry logic in backup and restore managers depends on these. RetriableException
is obvious. For DeletedException, document what the error-handling behavior is
(e.g. ignores deleted blobs during initial cleanup / snapshot index read etc.)
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+ private final JobModel jobModel;
+ private final ExecutorService executor;
+ private final String jobName;
+ private final String jobId;
+ private final ContainerModel containerModel;
+ private final TaskModel taskModel;
+ private final String taskName;
+ private final Config config;
+ private final Clock clock;
+ private final StorageManagerUtil storageManagerUtil;
+ private final List<String> storesToBackup;
+ private final File loggedStoreBaseDir;
+ private final BlobStoreUtil blobStoreUtil;
+
+ private final BlobStoreBackupManagerMetrics metrics;
+
+ /**
+ * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the
corresponding {@link SnapshotIndex} from
+ * last successful task checkpoint or {@link #upload}.
+ *
+ * After {@link #init}, the map reflects the contents of the last completed
checkpoint for the task from the previous
+ * deployment, if any.
+ *
+ * During regular processing, this map is updated after each successful
{@link #upload} with the blob id of
+ * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the
upload.
+ *
+ * The contents of this map are used to:
+ * 1. Delete any unused stores from the previous deployment in the remote
store during {@link #init}.
+ * 2. Calculate the diff for local state between the last and the current
checkpoint during {@link #upload}.
+ *
+ * Since the task commit process guarantees that the async stage of the
previous commit is complete before another
+ * commit can start, this future is guaranteed to be complete in the call to
{@link #upload} during the next commit.
+ *
+ * This field is non-final, since the future itself is replaced in its
entirety after init/upload.
+ * The internal map contents are never directly modified (e.g. using puts).
It's volatile to ensure visibility
+ * across threads since the map assignment may happen on a different thread
than the one reading the contents.
+ */
+ private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+ prevStoreSnapshotIndexesFuture;
+
+ public BlobStoreBackupManager(JobModel jobModel, ContainerModel
containerModel, TaskModel taskModel,
+ ExecutorService backupExecutor, BlobStoreBackupManagerMetrics
blobStoreTaskBackupMetrics, Config config,
+ Clock clock, File loggedStoreBaseDir, StorageManagerUtil
storageManagerUtil, BlobStoreUtil blobStoreUtil) {
+ this.jobModel = jobModel;
+ this.jobName = new JobConfig(config).getName().get();
+ this.jobId = new JobConfig(config).getJobId();
+ this.containerModel = containerModel;
+ this.taskModel = taskModel;
+ this.taskName = taskModel.getTaskName().getTaskName();
+ this.executor = backupExecutor;
+ this.config = config;
+ this.clock = clock;
+ this.storageManagerUtil = storageManagerUtil;
+ StorageConfig storageConfig = new StorageConfig(config);
+ this.storesToBackup =
+
storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+ this.loggedStoreBaseDir = loggedStoreBaseDir;
+ this.blobStoreUtil = blobStoreUtil;
+ this.prevStoreSnapshotIndexesFuture =
CompletableFuture.completedFuture(ImmutableMap.of());
+ this.metrics = blobStoreTaskBackupMetrics;
+ metrics.initStoreMetrics(storesToBackup);
+ }
+
+ @Override
+ public void init(Checkpoint checkpoint) {
+ long startTime = System.nanoTime();
+ LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+
+ // Note: blocks the caller (main) thread.
+ Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+ BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId,
taskName, checkpoint, blobStoreUtil);
+ this.prevStoreSnapshotIndexesFuture =
+
CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
+ metrics.initNs.set(System.nanoTime() - startTime);
+ }
+
+ @Override
+ public Map<String, String> snapshot(CheckpointId checkpointId) {
+ // No-op. Stores are flushed and checkpoints are created by commit manager
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public CompletableFuture<Map<String, String>> upload(CheckpointId
checkpointId, Map<String, String> storeSCMs) {
+ long uploadStartTime = System.nanoTime();
+
+ // reset gauges for each upload
+ metrics.filesToUpload.getValue().set(0L);
+ metrics.bytesToUpload.getValue().set(0L);
+ metrics.filesUploaded.getValue().set(0L);
+ metrics.bytesUploaded.getValue().set(0L);
+ metrics.filesRemaining.getValue().set(0L);
+ metrics.bytesRemaining.getValue().set(0L);
+ metrics.filesToRetain.getValue().set(0L);
+ metrics.bytesToRetain.getValue().set(0L);
+
+ Map<String, CompletableFuture<Pair<String, SnapshotIndex>>>
+ storeToSCMAndSnapshotIndexPairFutures = new HashMap<>();
+ Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new
HashMap<>();
+
+ storesToBackup.forEach((storeName) -> {
Review comment:
This is already async. This is just collecting the futures in the 2 maps
above.
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskRestoreManager;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import
org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.util.FileUtil;
+import org.apache.samza.util.FutureUtil;
+import org.checkerframework.checker.nullness.Opt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreRestoreManager implements TaskRestoreManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(BlobStoreRestoreManager.class);
+ // when checking if checkpoint dir is the same as remote snapshot, exclude
the "OFFSET" family of files files
+ // that are written to the checkpoint dir after the remote upload is
complete as part of
+ // TaskStorageCommitManager#writeCheckpointToStoreDirectories.
+ private static final Set<String> FILES_TO_IGNORE = ImmutableSet.of(
+ StorageManagerUtil.OFFSET_FILE_NAME_LEGACY,
+ StorageManagerUtil.OFFSET_FILE_NAME_NEW,
+ StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY,
+ StorageManagerUtil.CHECKPOINT_FILE_NAME);
+
+ private final TaskModel taskModel;
+ private final String jobName;
+ private final String jobId;
+ private final ExecutorService executor;
+ private final Config config;
+ private final StorageConfig storageConfig;
+ private final StorageManagerUtil storageManagerUtil;
+ private final BlobStoreUtil blobStoreUtil;
+ private final File loggedBaseDir;
+ private final File nonLoggedBaseDir;
+ private final String taskName;
+ private final List<String> storesToRestore;
+
+ private final BlobStoreRestoreManagerMetrics metrics;
+
+ /**
+ * Map of store name and Pair of blob id of SnapshotIndex and the
corresponding SnapshotIndex from last snapshot
+ * creation
+ */
+ private Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes;
+
+ public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService
restoreExecutor,
+ BlobStoreRestoreManagerMetrics metrics, Config config,
StorageManagerUtil storageManagerUtil,
+ BlobStoreUtil blobStoreUtil, File loggedBaseDir, File nonLoggedBaseDir) {
+ this.taskModel = taskModel;
+ this.jobName = new JobConfig(config).getName().get();
+ this.jobId = new JobConfig(config).getJobId();
+ this.executor = restoreExecutor; // TODO BLOCKER dchen1 dont block on
restore executor
+ this.config = config;
+ this.storageConfig = new StorageConfig(config);
+ this.storageManagerUtil = storageManagerUtil;
+ this.blobStoreUtil = blobStoreUtil;
+ this.prevStoreSnapshotIndexes = new HashMap<>();
+ this.loggedBaseDir = loggedBaseDir;
+ this.nonLoggedBaseDir = nonLoggedBaseDir;
+ this.taskName = taskModel.getTaskName().getTaskName();
+ this.storesToRestore =
+
storageConfig.getStoresWithStateBackendRestoreFactory(BlobStoreStateBackendFactory.class.getName());
+ this.metrics = metrics;
+ }
+
+ @Override
+ public void init(Checkpoint checkpoint) {
+ long startTime = System.nanoTime();
+ LOG.debug("Initializing blob store restore manager for task: {}",
taskName);
+ // get previous SCMs from checkpoint
+ prevStoreSnapshotIndexes =
+ BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId,
taskName, checkpoint, blobStoreUtil);
+ metrics.getSnapshotIndexNs.set(System.nanoTime() - startTime);
+ LOG.trace("Found previous snapshot index during blob store restore manager
init for task: {} to be: {}",
+ taskName, prevStoreSnapshotIndexes);
+
+ metrics.initStoreMetrics(storesToRestore);
+
+ // Note: blocks the caller (main) thread.
+ deleteUnusedStoresFromBlobStore(jobName, jobId, taskName, storageConfig,
prevStoreSnapshotIndexes, blobStoreUtil, executor);
+ metrics.initNs.set(System.nanoTime() - startTime);
+ }
+
+ /**
+ * Restore state from checkpoints, state snapshots and changelog.
+ */
+ @Override
+ public void restore() {
+ restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore,
prevStoreSnapshotIndexes, loggedBaseDir,
+ storageConfig, metrics, storageManagerUtil, blobStoreUtil, executor);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ /**
+ * Deletes blob store contents for stores that were present in the last
checkpoint but are either no longer
+ * present in job configs (removed by user since last deploymetn) or are no
longer configured to be backed
+ * up using blob stores.
+ *
+ * This method blocks until all the necessary store contents and snapshot
index blobs have been marked for deletion.
+ */
+ @VisibleForTesting
+ static void deleteUnusedStoresFromBlobStore(String jobName, String jobId,
String taskName, StorageConfig storageConfig,
+ Map<String, Pair<String, SnapshotIndex>> initialStoreSnapshotIndexes,
+ BlobStoreUtil blobStoreUtil, ExecutorService executor) {
+
+ List<String> storesToBackup =
+
storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+ List<String> storesToRestore =
+
storageConfig.getStoresWithStateBackendRestoreFactory(BlobStoreStateBackendFactory.class.getName());
+
+ List<CompletionStage<Void>> storeDeletionFutures = new ArrayList<>();
+ initialStoreSnapshotIndexes.forEach((storeName, scmAndSnapshotIndex) -> {
+ if (!storesToBackup.contains(storeName) &&
!storesToRestore.contains(storeName)) {
+ LOG.debug("Removing task: {} store: {} from blob store. It is either
no longer used, " +
+ "or is no longer configured to be backed up or restored with blob
store.", taskName, storeName);
+ DirIndex dirIndex = scmAndSnapshotIndex.getRight().getDirIndex();
+ Metadata requestMetadata =
+ new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX,
Optional.empty(), jobName, jobId, taskName, storeName);
+ CompletionStage<Void> storeDeletionFuture =
+ blobStoreUtil.cleanUpDir(dirIndex, requestMetadata) // delete
files and sub-dirs previously marked for removal
+ .thenComposeAsync(v ->
+ blobStoreUtil.deleteDir(dirIndex, requestMetadata),
executor) // deleted files and dirs still present
+ .thenComposeAsync(v -> blobStoreUtil.deleteSnapshotIndexBlob(
+ scmAndSnapshotIndex.getLeft(), requestMetadata),
+ executor); // delete the snapshot index blob
+ storeDeletionFutures.add(storeDeletionFuture);
+ }
+ });
+
+ FutureUtil.allOf(storeDeletionFutures).join();
+ }
+
+ /**
+ * Restores all eligible stores in the task.
+ */
+ @VisibleForTesting
+ static void restoreStores(String jobName, String jobId, TaskName taskName,
List<String> storesToRestore,
+ Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
+ File loggedBaseDir, StorageConfig storageConfig,
BlobStoreRestoreManagerMetrics metrics,
+ StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil,
+ ExecutorService executor) {
+ long restoreStartTime = System.nanoTime();
+ List<CompletionStage<Void>> restoreFutures = new ArrayList<>();
+
+ LOG.debug("Starting restore for task: {} stores: {}", taskName,
storesToRestore);
+ storesToRestore.forEach(storeName -> {
+ if (!prevStoreSnapshotIndexes.containsKey(storeName)) {
+ LOG.debug("No checkpointed snapshot index found for task: {} store:
{}. Skipping restore.", taskName, storeName);
+ // TODO HIGH shesharm what should we do with the local state already
present on disk, if any?
+ // E.g. this will be the case if user changes a store from changelog
based backup and restore to
+ // blob store based backup and restore, both at the same time.
+ return;
+ }
+
+ Pair<String, SnapshotIndex> scmAndSnapshotIndex =
prevStoreSnapshotIndexes.get(storeName);
+
+ long storeRestoreStartTime = System.nanoTime();
+ SnapshotIndex snapshotIndex = scmAndSnapshotIndex.getRight();
+ DirIndex dirIndex = snapshotIndex.getDirIndex();
+
+ // TODO MINOR shesharm: calculate recursively similar to DirDiff.Stats
+ long bytesToRestore = dirIndex.getFilesPresent().stream().mapToLong(fi
-> fi.getFileMetadata().getSize()).sum();
+
metrics.filesToRestore.getValue().addAndGet(dirIndex.getFilesPresent().size());
+ metrics.bytesToRestore.getValue().addAndGet(bytesToRestore);
+
metrics.filesRemaining.getValue().addAndGet(dirIndex.getFilesPresent().size());
+ metrics.bytesRemaining.getValue().addAndGet(bytesToRestore);
+
+ CheckpointId checkpointId =
snapshotIndex.getSnapshotMetadata().getCheckpointId();
+ File storeDir = storageManagerUtil.getTaskStoreDir(loggedBaseDir,
storeName, taskName, TaskMode.Active);
+ Path storeCheckpointDir =
Paths.get(storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId));
+ LOG.trace("Got task: {} store: {} local store directory: {} and local
store checkpoint directory: {}",
+ taskName, storeName, storeDir, storeCheckpointDir);
+
+ // we always delete the store dir to preserve transactional state
guarantees.
+ try {
+ LOG.debug("Deleting local store directory: {}. Will be restored from
local store checkpoint directory " +
+ "or remote snapshot.", storeDir);
+ FileUtils.deleteDirectory(storeDir);
+ } catch (IOException e) {
+ throw new SamzaException(String.format("Error deleting store
directory: %s", storeDir), e);
+ }
+
+ boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName,
dirIndex,
+ storeCheckpointDir, storageConfig, blobStoreUtil);
+
+ if (shouldRestore) { // restore the store from the remote blob store
+ LOG.debug("Deleting local store checkpoint directory: {} before
restore.", storeCheckpointDir);
+ // delete all store checkpoint directories. if we only delete the
store directory and don't
+ // delete the checkpoint directories, the store size on disk will grow
to 2x after restore
+ // until the first commit is completed and older checkpoint dirs are
deleted. This is
+ // because the hard-linked checkpoint dir files will no longer be
de-duped with the
+ // now-deleted main store directory contents and will take up
additional space of their
+ // own during the restore.
+ deleteCheckpointDirs(taskName, storeName, loggedBaseDir,
storageManagerUtil);
+
+ enqueueRestore(jobName, jobId, taskName.toString(), storeName,
storeDir, dirIndex, storeRestoreStartTime,
+ restoreFutures, blobStoreUtil, metrics, executor);
+ } else {
+ LOG.debug("Renaming store checkpoint directory: {} to store directory:
{} since its contents are identical " +
+ "to the remote snapshot.", storeCheckpointDir, storeDir);
+ // atomically rename the checkpoint dir to the store dir
+ new FileUtil().move(storeCheckpointDir.toFile(), storeDir);
+
+ // delete any other checkpoint dirs.
+ deleteCheckpointDirs(taskName, storeName, loggedBaseDir,
storageManagerUtil);
+ }
+ });
+
+ // wait for all restores to finish
+ FutureUtil.allOf(restoreFutures).whenComplete((res, ex) -> {
+ LOG.info("Restore completed for task: {} stores", taskName);
+ metrics.restoreNs.set(System.nanoTime() - restoreStartTime);
+ }).join(); // TODO BLOCKER dchen1 make non-blocking.
+ }
+
+ /**
+ * Determines if the store needs to be restored from remote snapshot based
on local and remote state.
+ */
+ @VisibleForTesting
+ static boolean shouldRestore(String taskName, String storeName, DirIndex
dirIndex,
+ Path storeCheckpointDir, StorageConfig storageConfig, BlobStoreUtil
blobStoreUtil) {
+ // if a store checkpoint directory exists for the last successful task
checkpoint, try to use it.
+ boolean restoreStore;
+ if (Files.exists(storeCheckpointDir)) {
+ if (storageConfig.getCleanLoggedStoreDirsOnStart(storeName)) {
Review comment:
Accessors for boolean values should be named `X / isX / shouldX / doX`
etc., not getX etc.
E.g., isSkyBlue makes more sense than getIsSkyBlue, shouldRun makes more
sense than getShouldRun etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]