prateekm commented on a change in pull request #1501:
URL: https://github.com/apache/samza/pull/1501#discussion_r639015142
##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -313,11 +287,44 @@ public int getNumPersistentStores() {
.count();
}
+ public List<String> getStoreBackupManagerClassName(String storeName) {
+ List<String> storeBackupManagers =
getList(String.format(STORE_BACKUP_FACTORIES, storeName), new ArrayList<>());
+ // For backwards compatibility if the changelog is enabled, we use default
kafka backup factory
+ if (storeBackupManagers.isEmpty() &&
getChangelogStream(storeName).isPresent()) {
+ storeBackupManagers = DEFAULT_BACKUP_FACTORIES;
+ }
+ return storeBackupManagers;
+ }
+
+ public Set<String> getStateBackendBackupFactories() {
+ return getStoreNames().stream()
+ .flatMap((storeName) ->
getStoreBackupManagerClassName(storeName).stream())
+ .collect(Collectors.toSet());
+ }
+
+ public String getStateBackendRestoreFactory() {
+ return get(STORE_RESTORE_FACTORY, KAFKA_STATE_BACKEND_FACTORY);
+ }
+
+ public List<String> getStoresWithBackupFactory(String backendFactoryName) {
+ return getStoreNames().stream()
+ .filter((storeName) -> getStoreBackupManagerClassName(storeName)
+ .contains(backendFactoryName))
+ .collect(Collectors.toList());
+ }
+
+ // TODO BLOCKER dchen update when making restore managers per store
+ public List<String> getStoresWithRestoreFactory(String backendFactoryName) {
+ return getStoreNames().stream()
+ .filter((storeName) ->
getStateBackendRestoreFactory().equals(backendFactoryName))
+ .collect(Collectors.toList());
+ }
+
/**
* Helper method to get if logged store dirs should be deleted regardless of
their contents.
* @return
*/
- public boolean getCleanLoggedStoreDirsOnStart(String storeName) {
+ public boolean isCleanLoggedStoreDirsOnStart(String storeName) {
Review comment:
Follow Up: s/isCleanLoggedStoreDirsOnStart/cleanLoggedStoreDirsOnStart
##########
File path: samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+/**
+ * Config related helper methods for BlobStore.
+ */
+public class BlobStoreConfig extends MapConfig {
+
+ public static final String BLOB_STORE_MANAGER_FACTORY =
"blob.store.manager.factory";
+ public static final String BLOB_STORE_ADMIN_FACTORY =
"blob.store.admin.factory";
+ public BlobStoreConfig(Config config) {
+ super(config);
+ }
+
+
+ public String getBlobStoreManagerFactory() {
+ // TODO BLOCKER dchen validate that if blob store state backend is
configured for use this config is also set.
Review comment:
Minor: Remove TODO.
##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -66,13 +66,14 @@
public static final String CHANGELOG_MIN_COMPACTION_LAG_MS = STORE_PREFIX +
"%s.changelog." + MIN_COMPACTION_LAG_MS;
public static final long DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS =
TimeUnit.HOURS.toMillis(4);
- public static final String DEFAULT_STATE_BACKEND_FACTORY =
"org.apache.samza.storage.KafkaChangelogStateBackendFactory";
- public static final String STORE_BACKEND_BACKUP_FACTORIES = STORE_PREFIX +
"%s.state.backend.backup.factories";
- public static final List<String> DEFAULT_STATE_BACKEND_BACKUP_FACTORIES =
ImmutableList.of(
- DEFAULT_STATE_BACKEND_FACTORY);
- public static final String STATE_BACKEND_RESTORE_FACTORY = STORE_PREFIX +
"state.restore.backend";
public static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
+ public static final String KAFKA_STATE_BACKEND_FACTORY =
+ "org.apache.samza.storage.KafkaChangelogStateBackendFactory";
+ public static final List<String> DEFAULT_BACKUP_FACTORIES = ImmutableList.of(
Review comment:
Follow Up: Remove this default and use the value explicitly in
getStoreBackupManagerClassName
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -719,7 +745,11 @@ class SamzaContainer(
private val jobConfig = new JobConfig(config)
private val taskConfig = new TaskConfig(config)
- val shutdownMs: Long = taskConfig.getShutdownMs
+
+ // Linkedin-specific shutdownMs due to SAMZA-2198; switch back to
TaskConfig.getShutdownMs once that is fixed
+ val shutdownMs: Long = taskConfig.getLong(TaskConfig.TASK_SHUTDOWN_MS, 5000)
Review comment:
BLOCKER: @shekhars-li
##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -313,11 +287,44 @@ public int getNumPersistentStores() {
.count();
}
+ public List<String> getStoreBackupManagerClassName(String storeName) {
Review comment:
Follow Up:
s/getStoreBackupManagerClassName/getStoreBackupFactory
s/getStateBackendBackupFactories/getBackupFactories
s/getStateBackendRestoreFactories/getRestoreFactories
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+ private final JobModel jobModel;
+ private final ExecutorService executor;
+ private final String jobName;
+ private final String jobId;
+ private final ContainerModel containerModel;
+ private final TaskModel taskModel;
+ private final String taskName;
+ private final Config config;
+ private final Clock clock;
+ private final StorageManagerUtil storageManagerUtil;
+ private final List<String> storesToBackup;
+ private final File loggedStoreBaseDir;
+ private final BlobStoreUtil blobStoreUtil;
+
+ private final BlobStoreBackupManagerMetrics metrics;
+
+ /**
+ * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the
corresponding {@link SnapshotIndex} from
+ * last successful task checkpoint or {@link #upload}.
+ *
+ * After {@link #init}, the map reflects the contents of the last completed
checkpoint for the task from the previous
+ * deployment, if any.
+ *
+ * During regular processing, this map is updated after each successful
{@link #upload} with the blob id of
+ * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the
upload.
+ *
+ * The contents of this map are used to:
+ * 1. Delete any unused stores from the previous deployment in the remote
store during {@link #init}.
+ * 2. Calculate the diff for local state between the last and the current
checkpoint during {@link #upload}.
+ *
+ * Since the task commit process guarantees that the async stage of the
previous commit is complete before another
+ * commit can start, this future is guaranteed to be complete in the call to
{@link #upload} during the next commit.
+ *
+ * This field is non-final, since the future itself is replaced in its
entirety after init/upload.
+ * The internal map contents are never directly modified (e.g. using puts).
It's volatile to ensure visibility
+ * across threads since the map assignment may happen on a different thread
than the one reading the contents.
+ */
+ private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+ prevStoreSnapshotIndexesFuture;
+
+ public BlobStoreBackupManager(JobModel jobModel, ContainerModel
containerModel, TaskModel taskModel,
+ ExecutorService backupExecutor, BlobStoreBackupManagerMetrics
blobStoreTaskBackupMetrics, Config config,
+ Clock clock, File loggedStoreBaseDir, StorageManagerUtil
storageManagerUtil, BlobStoreUtil blobStoreUtil) {
+ this.jobModel = jobModel;
+ this.jobName = new JobConfig(config).getName().get();
+ this.jobId = new JobConfig(config).getJobId();
+ this.containerModel = containerModel;
+ this.taskModel = taskModel;
+ this.taskName = taskModel.getTaskName().getTaskName();
+ this.executor = backupExecutor;
+ this.config = config;
+ this.clock = clock;
+ this.storageManagerUtil = storageManagerUtil;
+ StorageConfig storageConfig = new StorageConfig(config);
+ this.storesToBackup =
+
storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+ this.loggedStoreBaseDir = loggedStoreBaseDir;
+ this.blobStoreUtil = blobStoreUtil;
+ this.prevStoreSnapshotIndexesFuture =
CompletableFuture.completedFuture(ImmutableMap.of());
+ this.metrics = blobStoreTaskBackupMetrics;
+ metrics.initStoreMetrics(storesToBackup);
+ }
+
+ @Override
+ public void init(Checkpoint checkpoint) {
+ long startTime = System.nanoTime();
+ LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+
+ // Note: blocks the caller (main) thread.
+ Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+ BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId,
taskName, checkpoint, blobStoreUtil);
+ this.prevStoreSnapshotIndexesFuture =
+
CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
+ metrics.initNs.set(System.nanoTime() - startTime);
+ }
+
+ @Override
+ public Map<String, String> snapshot(CheckpointId checkpointId) {
+ // No-op. Stores are flushed and checkpoints are created by commit manager
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public CompletableFuture<Map<String, String>> upload(CheckpointId
checkpointId, Map<String, String> storeSCMs) {
+ long uploadStartTime = System.nanoTime();
+
+ // reset gauges for each upload
+ metrics.filesToUpload.getValue().set(0L);
+ metrics.bytesToUpload.getValue().set(0L);
+ metrics.filesUploaded.getValue().set(0L);
+ metrics.bytesUploaded.getValue().set(0L);
+ metrics.filesRemaining.getValue().set(0L);
+ metrics.bytesRemaining.getValue().set(0L);
+ metrics.filesToRetain.getValue().set(0L);
+ metrics.bytesToRetain.getValue().set(0L);
+
+ Map<String, CompletableFuture<Pair<String, SnapshotIndex>>>
+ storeToSCMAndSnapshotIndexPairFutures = new HashMap<>();
+ Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new
HashMap<>();
+
+ storesToBackup.forEach((storeName) -> {
+ long storeUploadStartTime = System.nanoTime();
+ try {
+ // metadata for the current store snapshot to upload
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId,
jobName, jobId, taskName, storeName);
+
+ // Only durable/persistent stores are passed here from commit manager
+ // get the local store dir corresponding to the current checkpointId
+ File storeDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir,
storeName,
+ taskModel.getTaskName(), taskModel.getTaskMode());
+ String checkpointDirPath =
storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId);
+ File checkpointDir = new File(checkpointDirPath);
+
+ LOG.debug("Got task: {} store: {} storeDir: {} and checkpointDir: {}",
+ taskName, storeName, storeDir, checkpointDir);
+
+ // get the previous store directory contents
+ DirIndex prevDirIndex;
+
+ // guaranteed to be available since a new task commit may not start
until the previous one is complete
+ Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+ prevStoreSnapshotIndexesFuture.get(0, TimeUnit.MILLISECONDS);
+
+ if (prevStoreSnapshotIndexes.containsKey(storeName)) {
+ prevDirIndex =
prevStoreSnapshotIndexes.get(storeName).getRight().getDirIndex();
+ } else {
+ // no previous SnapshotIndex means that this is the first commit for
this store. Create an empty DirIndex.
+ prevDirIndex = new DirIndex(checkpointDir.getName(),
Collections.emptyList(), Collections.emptyList(),
+ Collections.emptyList(), Collections.emptyList());
+ }
+
+ long dirDiffStartTime = System.nanoTime();
+ // get the diff between previous and current store directories
+ DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex,
BlobStoreUtil.areSameFile(false));
+ metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() -
dirDiffStartTime);
+
+ DirDiff.Stats stats = DirDiff.getStats(dirDiff);
+ updateStoreDiffMetrics(storeName, stats);
+ metrics.filesToUpload.getValue().addAndGet(stats.filesAdded);
+ metrics.bytesToUpload.getValue().addAndGet(stats.bytesAdded);
+ metrics.filesRemaining.getValue().addAndGet(stats.filesAdded);
+ metrics.bytesRemaining.getValue().addAndGet(stats.bytesAdded);
+ metrics.filesToRetain.getValue().addAndGet(stats.filesRetained);
+ metrics.bytesToRetain.getValue().addAndGet(stats.bytesRetained);
+
+ // upload the diff to the blob store and get the new directory index
+ CompletionStage<DirIndex> dirIndexFuture =
blobStoreUtil.putDir(dirDiff, snapshotMetadata);
+
+ CompletionStage<SnapshotIndex> snapshotIndexFuture =
+ dirIndexFuture.thenApplyAsync(dirIndex -> {
+ LOG.trace("Dir upload complete. Returning new SnapshotIndex for
task: {} store: {}.", taskName, storeName);
+ Optional<String> prevSnapshotIndexBlobId =
+ Optional.ofNullable(prevStoreSnapshotIndexes.get(storeName))
+ .map(Pair::getLeft);
+ return new SnapshotIndex(clock.currentTimeMillis(),
snapshotMetadata, dirIndex, prevSnapshotIndexBlobId);
+ }, executor);
+
+ // upload the new snapshot index to the blob store and get its blob id
+ CompletionStage<String> snapshotIndexBlobIdFuture =
+ snapshotIndexFuture
+ .thenComposeAsync(si -> {
+ LOG.trace("Uploading Snapshot index for task: {} store: {}",
taskName, storeName);
+ return blobStoreUtil.putSnapshotIndex(si);
+ }, executor);
+
+ // update the temporary storeName to previous snapshot index map with
the new mapping.
+ CompletableFuture<Pair<String, SnapshotIndex>>
scmAndSnapshotIndexPairFuture =
+ FutureUtil.toFutureOfPair(
+ Pair.of(snapshotIndexBlobIdFuture.toCompletableFuture(),
snapshotIndexFuture.toCompletableFuture()));
+
+ scmAndSnapshotIndexPairFuture.whenComplete((res, ex) -> {
+ long uploadTimeNs = System.nanoTime() - storeUploadStartTime;
+ metrics.storeUploadNs.get(storeName).update(uploadTimeNs);
+ });
+
+ storeToSCMAndSnapshotIndexPairFutures.put(storeName,
scmAndSnapshotIndexPairFuture);
+ storeToSerializedSCMFuture.put(storeName,
snapshotIndexBlobIdFuture.toCompletableFuture());
+ } catch (Exception e) {
+ throw new SamzaException(
+ String.format("Error uploading store snapshot to blob store for
task: %s, store: %s, checkpointId: %s",
+ taskName, storeName, checkpointId), e);
+ }
+ });
+
+ // replace the previous storeName to snapshot index mapping with the new
mapping.
+ this.prevStoreSnapshotIndexesFuture =
+ FutureUtil.toFutureOfMap(storeToSCMAndSnapshotIndexPairFutures);
+
+ return FutureUtil.toFutureOfMap(storeToSerializedSCMFuture)
+ .whenComplete((res, ex) -> metrics.uploadNs.update(System.nanoTime() -
uploadStartTime));
+ }
+
+ /**
+ * Clean up would be called at the end of every commit as well as on a
container start/restart.
+ * Clean up involves the following steps:
+ * 1. Remove TTL of the snapshot index blob and for any associated files and
sub-dirs marked for retention.
+ * 2. Delete the files/subdirs marked for deletion in the snapshot index.
+ * 3. Delete the remote {@link SnapshotIndex} blob for the previous
checkpoint.
+ * @param checkpointId the {@link CheckpointId} of the last successfully
committed checkpoint.
+ * @param storeSCMs store name to state checkpoint markers for the last
successfully committed checkpoint
+ */
+ @Override
+ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId,
Map<String, String> storeSCMs) {
+ long startTime = System.nanoTime();
+ List<CompletionStage<Void>> removeTTLFutures = new ArrayList<>();
+ List<CompletionStage<Void>> cleanupRemoteSnapshotFutures = new
ArrayList<>();
+ List<CompletionStage<Void>> removePrevRemoteSnapshotFutures = new
ArrayList<>();
+
+ // SCM, in case of blob store backup and restore, is just the blob id of
SnapshotIndex representing the remote snapshot
+ storeSCMs.forEach((storeName, snapshotIndexBlobId) -> {
Review comment:
Follow Up, potentially still a blocker: I meant you should document why
this check exists inline, and add unit and integration tests for this as well.
It will be easy to accidentally refactor away later.
More important than commit failing is the fact that cleanUp is called within
TaskInstance#init, and should cause container startup to fail, and continue
failing on subsequent restarts.
I'm wondering now why we didn't see this behavior during test deployments?
Is there some behavior that will cause subsequent restarts to succeed? Let's
try to reproduce our assumptions with an integration test and then implement
the fix.
##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -66,13 +66,14 @@
public static final String CHANGELOG_MIN_COMPACTION_LAG_MS = STORE_PREFIX +
"%s.changelog." + MIN_COMPACTION_LAG_MS;
public static final long DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS =
TimeUnit.HOURS.toMillis(4);
- public static final String DEFAULT_STATE_BACKEND_FACTORY =
"org.apache.samza.storage.KafkaChangelogStateBackendFactory";
- public static final String STORE_BACKEND_BACKUP_FACTORIES = STORE_PREFIX +
"%s.state.backend.backup.factories";
- public static final List<String> DEFAULT_STATE_BACKEND_BACKUP_FACTORIES =
ImmutableList.of(
- DEFAULT_STATE_BACKEND_FACTORY);
- public static final String STATE_BACKEND_RESTORE_FACTORY = STORE_PREFIX +
"state.restore.backend";
public static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
+ public static final String KAFKA_STATE_BACKEND_FACTORY =
+ "org.apache.samza.storage.KafkaChangelogStateBackendFactory";
+ public static final List<String> DEFAULT_BACKUP_FACTORIES = ImmutableList.of(
+ KAFKA_STATE_BACKEND_FACTORY);
+ public static final String STORE_BACKUP_FACTORIES = STORE_PREFIX +
"%s.backup.factories";
+ public static final String STORE_RESTORE_FACTORY = STORE_PREFIX +
"restore.factory";
Review comment:
BLOCKER: Add a `TODO HIGH dxichen` to make this per store.
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/blobstore/exceptions/DeletedException.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore.exceptions;
+
+public class DeletedException extends RuntimeException {
Review comment:
+1 to Dan's comment - let's add this to at-throws sections in the
BlobStoreManager javadocs instead. Also rephrase: this exception should not be
"thrown" synchronously by the manager. Futures should be completed
exceptionally with this exception on failure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]