prateekm commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1273936574
##########
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java:
##########
@@ -93,14 +93,18 @@ public TaskStorageCommitManager(TaskName taskName,
Map<String, TaskBackupManager
this.metrics = metrics;
}
- public void init() {
+ public void init(Checkpoint checkpoint) {
// Assuming that container storage manager has already started and created
to stores
storageEngines = containerStorageManager.getAllStores(taskName);
if (checkpointManager != null) {
- Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
+ // In case of standby containers
Review Comment:
Minor: conditions can be simplified. checkpointManager != null only needs to
be checked if checkpoint == null.
Also prefer not reassigning input params, assign to finalCheckpoint instead.
##########
samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java:
##########
@@ -49,11 +49,12 @@ public interface BlobStoreManager {
* @param id Blob ID of the blob to get
* @param outputStream OutputStream to write the downloaded blob
* @param metadata User supplied {@link Metadata} of the request
+ * @param getDeletedBlob Flag to indicate if get should try to get a blob
marked for deletion but not yet compacted
* @return A future that completes when all the chunks are downloaded and
written successfully to the OutputStream
* @throws org.apache.samza.storage.blobstore.exceptions.DeletedException
returned future should complete
* exceptionally with DeletedException on failure with the blob
already deleted error.
*/
- CompletionStage<Void> get(String id, OutputStream outputStream, Metadata
metadata);
+ CompletionStage<Void> get(String id, OutputStream outputStream, Metadata
metadata, Boolean getDeletedBlob);
Review Comment:
Minor: boolean, to avoid accidentally passing null values
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -119,13 +119,18 @@ public BlobStoreRestoreManager(TaskModel taskModel,
ExecutorService restoreExecu
@Override
public void init(Checkpoint checkpoint) {
+ init(checkpoint, false);
+ }
+
+ public void init(Checkpoint checkpoint, Boolean getDeletedBlob) {
Review Comment:
Do we need both init variants / why do we need both to be public?
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -153,6 +158,10 @@ public CompletableFuture<Void> restore() {
storageConfig, metrics, storageManagerUtil, blobStoreUtil,
dirDiffUtil, executor);
}
+ public CompletableFuture<Void> restore(Boolean restoreDeleted) {
Review Comment:
Add javadoc to public method. Why do we need both variants of restore?
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -162,15 +167,7 @@ public Map<String, Pair<String, SnapshotIndex>>
getStoreSnapshotIndexes(
}
try {
- return FutureUtil.toFutureOfMap(t -> {
- Throwable unwrappedException =
FutureUtil.unwrapExceptions(CompletionException.class, t);
- if (unwrappedException instanceof DeletedException) {
- LOG.warn("Ignoring already deleted snapshot index for taskName: {}",
taskName, t);
- return true;
- } else {
- return false;
- }
- }, storeSnapshotIndexFutures).join();
+ return FutureUtil.toFutureOfMap(storeSnapshotIndexFutures).join();
Review Comment:
Not obvious to me: can you explain the difference in error handling behavior
before / after? What does the predicate param to toFutureOfMap do?
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -329,11 +340,11 @@ static boolean shouldRestore(String taskName, String
storeName, DirIndex dirInde
@VisibleForTesting
static void enqueueRestore(String jobName, String jobId, String taskName,
String storeName, File storeDir, DirIndex dirIndex,
long storeRestoreStartTime, List<CompletionStage<Void>> restoreFutures,
BlobStoreUtil blobStoreUtil,
- DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics,
ExecutorService executor) {
+ DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics,
ExecutorService executor, Boolean getDeleted) {
Review Comment:
boolean
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -119,13 +119,18 @@ public BlobStoreRestoreManager(TaskModel taskModel,
ExecutorService restoreExecu
@Override
public void init(Checkpoint checkpoint) {
+ init(checkpoint, false);
Review Comment:
Why false? Document inline.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -232,6 +288,15 @@ public CompletionStage<Void>
deleteSnapshotIndexBlob(String snapshotIndexBlobId,
* @return A future that completes when all the async downloads completes
*/
public CompletableFuture<Void> restoreDir(File baseDir, DirIndex dirIndex,
Metadata metadata) {
+ return restoreDir(baseDir, dirIndex, metadata, false);
+ }
+
+ /**
+ * Non-blocking restore of a {@link SnapshotIndex} to local store by
downloading all the files and sub-dirs associated
+ * with this remote snapshot. getDeletedFiles flag sets whether to attempt a
get for deletedFiles or not.
Review Comment:
Minor: update doc for getDeletedFile flag (explain why / when to use, not
"getDeletedFiles gets deleted files" etc.)
##########
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:
##########
@@ -981,7 +981,7 @@ class SamzaContainer(
}
}
- def startStores {
+ def startStores: util.Map[TaskName, Checkpoint] = {
Review Comment:
Add documentation for what the return value represents.
##########
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:
##########
@@ -993,10 +993,14 @@ class SamzaContainer(
})
}
- def startTask {
+ def startTask(taskCheckpoints: util.Map[TaskName, Checkpoint]) {
Review Comment:
Add documentation here and later on for what the param represents.
Esp. clarify relationship to startpoints.
##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
}
}
- def initTask {
+ def initTask(lastTaskCheckpoint: Checkpoint) {
initCaughtUpMapping()
+ val isStandByTask = taskModel.getTaskMode == TaskMode.Standby
+
if (commitManager != null) {
debug("Starting commit manager for taskName: %s" format taskName)
-
- commitManager.init()
+ commitManager.init(if (isStandByTask) null else lastTaskCheckpoint)
Review Comment:
Minor: Prefer not inlining expresions as params. Assign explicitly.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -208,9 +205,9 @@ public ContainerStorageManager(
}
- public void start() throws SamzaException, InterruptedException {
+ public Map<TaskName, Checkpoint> start() throws SamzaException,
InterruptedException {
Review Comment:
Document return value.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -153,6 +158,10 @@ public CompletableFuture<Void> restore() {
storageConfig, metrics, storageManagerUtil, blobStoreUtil,
dirDiffUtil, executor);
}
+ public CompletableFuture<Void> restore(Boolean restoreDeleted) {
+ return restoreStores(jobName, jobId, taskModel.getTaskName(),
storesToRestore, prevStoreSnapshotIndexes,
+ loggedBaseDir, storageConfig, metrics, storageManagerUtil,
blobStoreUtil, dirDiffUtil, executor, restoreDeleted);
+ }
Review Comment:
Newline after.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -301,64 +300,14 @@ private void restoreStores() throws InterruptedException {
samzaContainerMetrics, taskInstanceMetrics,
taskInstanceCollectors, serdes,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config,
clock);
taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
+ taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
});
- // Initialize each TaskStorageManager
- taskRestoreManagers.forEach((taskName, restoreManagers) ->
- restoreManagers.forEach((factoryName, taskRestoreManager) ->
- taskRestoreManager.init(taskCheckpoints.get(taskName))
- )
- );
-
- // Start each store consumer once.
- // Note: These consumers are per system and only changelog system store
consumers will be started.
- // Some TaskRestoreManagers may not require the consumer to to be started,
but due to the agnostic nature of
- // ContainerStorageManager we always start the changelog consumer here in
case it is required
-
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
-
- List<Future<Void>> taskRestoreFutures = new ArrayList<>();
-
- // Submit restore callable for each taskInstance
- taskRestoreManagers.forEach((taskInstance, restoreManagersMap) -> {
- // Submit for each restore factory
- restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
- long startTime = System.currentTimeMillis();
- String taskName = taskInstance.getTaskName();
- LOG.info("Starting restore for state for task: {}", taskName);
- CompletableFuture<Void> restoreFuture =
taskRestoreManager.restore().handle((res, ex) -> {
- // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
- // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
- try {
- taskRestoreManager.close();
- } catch (Exception e) {
- LOG.error("Error closing restore manager for task: {} after {}
restore",
- taskName, ex != null ? "unsuccessful" : "successful", e);
- // ignore exception from close. container may still be be able to
continue processing/backups
- // if restore manager close fails.
- }
-
- long timeToRestore = System.currentTimeMillis() - startTime;
- if (samzaContainerMetrics != null) {
- Gauge taskGauge =
samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance,
null);
-
- if (taskGauge != null) {
- taskGauge.set(timeToRestore);
- }
- }
-
- if (ex != null) {
- // log and rethrow exception to communicate restore failure
- String msg = String.format("Error restoring state for task: %s",
taskName);
- LOG.error(msg, ex);
- throw new SamzaException(msg, ex); // wrap in unchecked exception
to throw from lambda
- } else {
- return null;
- }
- });
-
- taskRestoreFutures.add(restoreFuture);
- });
- });
+ // Init all taskRestores and if successful, create a future for restores
for each task
Review Comment:
"Init all *tasks stores*, and if successful, (concurrently?) restore each
task store".
Minor: Generally should not say "create a future for" since Future is just
the return type, not the main action. Same reason you'd say "get members" and
not "create a list for members".
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -212,9 +213,17 @@ static CompletableFuture<Void> restoreStores(String
jobName, String jobId, TaskN
File loggedBaseDir, StorageConfig storageConfig,
BlobStoreRestoreManagerMetrics metrics,
StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil,
DirDiffUtil dirDiffUtil,
ExecutorService executor) {
+ return restoreStores(jobName, jobId, taskName, storesToRestore,
prevStoreSnapshotIndexes, loggedBaseDir, storageConfig,
+ metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor,
false);
+ }
+
+ public static CompletableFuture<Void> restoreStores(String jobName, String
jobId, TaskName taskName, Set<String> storesToRestore,
Review Comment:
Why 2 variants? Previous method wasn't public. Why does this need to be?
##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
}
}
- def initTask {
+ def initTask(lastTaskCheckpoint: Checkpoint) {
Review Comment:
Also document this is nullable (or better, pass a scala Option instead)
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}
+
+ public static List<Future<Void>> initAndRestoreTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
SamzaContainerMetrics samzaContainerMetrics,
+ CheckpointManager checkpointManager, JobContext jobContext,
ContainerModel containerModel,
+ Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String,
Set<String>>> taskBackendFactoryToStoreNames,
+ Config config, ExecutorService executor, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+ Set<String> forceRestoreTasks = new HashSet<>();
+ // Initialize each TaskStorageManager.
+ taskRestoreManagers.forEach((taskName, restoreManagers) ->
+ restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+ try {
+ taskRestoreManager.init(taskCheckpoints.get(taskName));
+ } catch (SamzaException ex) {
+ if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // Get deleted SnapshotIndex blob with GetDeleted and mark the
task to be restored with GetDeleted as well.
+ // this ensures that the restore downloads the snapshot,
recreates a new snapshot, uploads it to blob store
+ // and creates a new checkpoint.
+ ((BlobStoreRestoreManager)
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+ forceRestoreTasks.add(taskName.getTaskName());
+ } else {
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("init failed for task: %s with
GetDeleted set to true", taskName);
Review Comment:
Failed for restore manager, not for task.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -254,8 +263,10 @@ static CompletableFuture<Void> restoreStores(String
jobName, String jobId, TaskN
throw new SamzaException(String.format("Error deleting store
directory: %s", storeDir), e);
}
+ // If getDeletedBlobs is enabled - always restore so that we get all the
blobs, including the deleted blobs,
+ // immediately restore it locally and backup to create new checkpoint.
boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName,
dirIndex,
- storeCheckpointDir, storageConfig, dirDiffUtil);
+ storeCheckpointDir, storageConfig, dirDiffUtil) || getDeletedBlobs;
Review Comment:
Move getDeletedBlob check to shouldRestore and update javadoc.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -114,6 +114,11 @@ public BlobStoreUtil(BlobStoreManager blobStoreManager,
ExecutorService executor
*/
public Map<String, Pair<String, SnapshotIndex>> getStoreSnapshotIndexes(
String jobName, String jobId, String taskName, Checkpoint checkpoint,
Set<String> storesToBackupOrRestore) {
+ return getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint,
storesToBackupOrRestore, false);
+ }
+
+ public Map<String, Pair<String, SnapshotIndex>>
getStoreSnapshotIndexes(String jobName, String jobId, String taskName,
Review Comment:
Same as other comments re: variants.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String>
putSnapshotIndex(SnapshotIndex snapshotIndex) {
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata) {
Review Comment:
Same as other comments re: variants.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -183,11 +180,20 @@ public Map<String, Pair<String, SnapshotIndex>>
getStoreSnapshotIndexes(
* @return a Future containing the {@link SnapshotIndex}
*/
public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId,
Metadata metadata) {
+ return getSnapshotIndex(blobId, metadata, false);
+ }
+
+ /**
+ * GETs the {@link SnapshotIndex} from the blob store.
+ * @param blobId blob ID of the {@link SnapshotIndex} to get
+ * @return a Future containing the {@link SnapshotIndex}
+ */
+ public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId,
Metadata metadata, Boolean getDeletedBlob) {
Review Comment:
Same as other comments re: variants.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String>
putSnapshotIndex(SnapshotIndex snapshotIndex) {
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata) {
+ return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata, Boolean getDeletedBlob) {
+ Metadata getSnapshotRequest = new
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
requestMetadata.getJobName(),
+ requestMetadata.getJobId(), requestMetadata.getTaskName(),
requestMetadata.getStoreName());
+ SnapshotIndex snapshotIndex = getSnapshotIndex(snapshotIndexBlobId,
getSnapshotRequest, getDeletedBlob).join();
+ return cleanSnapshotIndex(snapshotIndexBlobId, snapshotIndex,
requestMetadata);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param snapshotIndex SnapshotIndex to delete
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
SnapshotIndex snapshotIndex, Metadata requestMetadata) {
+ DirIndex dirIndex = snapshotIndex.getDirIndex();
+ CompletionStage<Void> storeDeletionFuture =
+ cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs
previously marked for removal
+ .thenComposeAsync(v ->
+ deleteDir(dirIndex, requestMetadata), executor) // deleted
files and dirs still present
+ .thenComposeAsync(v ->
deleteSnapshotIndexBlob(snapshotIndexBlobId, requestMetadata), executor) //
delete the snapshot index blob
+ .exceptionally(ex -> {
+ if (ex instanceof DeletedException) {
Review Comment:
Document error handling behavior (explain what it should do / is trying to
do).
Do you need to unwrap CompletedException here?
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String>
putSnapshotIndex(SnapshotIndex snapshotIndex) {
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata) {
+ return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata, Boolean getDeletedBlob) {
+ Metadata getSnapshotRequest = new
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
requestMetadata.getJobName(),
+ requestMetadata.getJobId(), requestMetadata.getTaskName(),
requestMetadata.getStoreName());
+ SnapshotIndex snapshotIndex = getSnapshotIndex(snapshotIndexBlobId,
getSnapshotRequest, getDeletedBlob).join();
+ return cleanSnapshotIndex(snapshotIndexBlobId, snapshotIndex,
requestMetadata);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param snapshotIndex SnapshotIndex to delete
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
SnapshotIndex snapshotIndex, Metadata requestMetadata) {
+ DirIndex dirIndex = snapshotIndex.getDirIndex();
+ CompletionStage<Void> storeDeletionFuture =
+ cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs
previously marked for removal
+ .thenComposeAsync(v ->
+ deleteDir(dirIndex, requestMetadata), executor) // deleted
files and dirs still present
+ .thenComposeAsync(v ->
deleteSnapshotIndexBlob(snapshotIndexBlobId, requestMetadata), executor) //
delete the snapshot index blob
+ .exceptionally(ex -> {
+ if (ex instanceof DeletedException) {
+ LOG.warn("DeletedException received on trying to clean up
SnapshotIndex {}. Ignoring the error.",
+ snapshotIndexBlobId);
+ return null;
+ }
+ String msg = String.format("Error deleting/cleaning up
SnapshotIndex: %s", snapshotIndexBlobId);
Review Comment:
Same as above, add more context.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String>
putSnapshotIndex(SnapshotIndex snapshotIndex) {
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata) {
+ return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata, Boolean getDeletedBlob) {
Review Comment:
Minor: boolean.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String>
putSnapshotIndex(SnapshotIndex snapshotIndex) {
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata) {
+ return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
Review Comment:
Minor: document behavior, not impl. E.g. "determines whether to attempt to
get deleted snapshot index blobs or not." or something.
##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
}
}
- def initTask {
+ def initTask(lastTaskCheckpoint: Checkpoint) {
initCaughtUpMapping()
+ val isStandByTask = taskModel.getTaskMode == TaskMode.Standby
+
if (commitManager != null) {
debug("Starting commit manager for taskName: %s" format taskName)
-
- commitManager.init()
+ commitManager.init(if (isStandByTask) null else lastTaskCheckpoint)
Review Comment:
Document what this param is and its relationship to startpoints in
CommitManager interface.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String>
putSnapshotIndex(SnapshotIndex snapshotIndex) {
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata) {
+ return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata, Boolean getDeletedBlob) {
+ Metadata getSnapshotRequest = new
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
requestMetadata.getJobName(),
+ requestMetadata.getJobId(), requestMetadata.getTaskName(),
requestMetadata.getStoreName());
+ SnapshotIndex snapshotIndex = getSnapshotIndex(snapshotIndexBlobId,
getSnapshotRequest, getDeletedBlob).join();
+ return cleanSnapshotIndex(snapshotIndexBlobId, snapshotIndex,
requestMetadata);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
Review Comment:
Minor: Blob id (typo)
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String>
putSnapshotIndex(SnapshotIndex snapshotIndex) {
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata) {
+ return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata, Boolean getDeletedBlob) {
+ Metadata getSnapshotRequest = new
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
requestMetadata.getJobName(),
+ requestMetadata.getJobId(), requestMetadata.getTaskName(),
requestMetadata.getStoreName());
+ SnapshotIndex snapshotIndex = getSnapshotIndex(snapshotIndexBlobId,
getSnapshotRequest, getDeletedBlob).join();
+ return cleanSnapshotIndex(snapshotIndexBlobId, snapshotIndex,
requestMetadata);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param snapshotIndex SnapshotIndex to delete
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
SnapshotIndex snapshotIndex, Metadata requestMetadata) {
+ DirIndex dirIndex = snapshotIndex.getDirIndex();
+ CompletionStage<Void> storeDeletionFuture =
+ cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs
previously marked for removal
+ .thenComposeAsync(v ->
+ deleteDir(dirIndex, requestMetadata), executor) // deleted
files and dirs still present
+ .thenComposeAsync(v ->
deleteSnapshotIndexBlob(snapshotIndexBlobId, requestMetadata), executor) //
delete the snapshot index blob
+ .exceptionally(ex -> {
+ if (ex instanceof DeletedException) {
+ LOG.warn("DeletedException received on trying to clean up
SnapshotIndex {}. Ignoring the error.",
Review Comment:
Any more information we need to log, e.g. storename / taskname etc?
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -397,10 +462,11 @@ public CompletionStage<Void> cleanUpDir(DirIndex
dirIndex, Metadata metadata) {
* @param fileBlobs List of {@link FileBlob}s that constitute this file.
* @param fileToRestore File pointing to the local path where the file will
be restored.
* @param requestMetadata {@link Metadata} associated with this request
+ * @param getDeletedFiles Flag that indicates whether to try to get Deleted
(but not yet compacted) files.
* @return a future that completes when the file is downloaded and written
or if an exception occurs.
*/
@VisibleForTesting
- CompletableFuture<Void> getFile(List<FileBlob> fileBlobs, File
fileToRestore, Metadata requestMetadata) {
+ CompletableFuture<Void> getFile(List<FileBlob> fileBlobs, File
fileToRestore, Metadata requestMetadata, Boolean getDeletedFiles) {
Review Comment:
Minor: boolean
##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -139,6 +139,17 @@ public static <K, V> CompletableFuture<Map<K, V>>
toFutureOfMap(
});
}
+ /**
+ * Helper method to convert: {@code Map<K, CompletionStage<Void>>}
+ * to: {@code CompletableFuture<Void>}
+ *
+ * Returns a future that completes when all value futures complete.
+ * Returned future completes exceptionally if any of the value futures
complete exceptionally.
+ */
+ public static CompletableFuture<Void> mapToFuture(Map<String,
CompletionStage<Void>> map) {
Review Comment:
Does this need to be different from method above?
##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
}
}
- def initTask {
+ def initTask(lastTaskCheckpoint: Checkpoint) {
initCaughtUpMapping()
+ val isStandByTask = taskModel.getTaskMode == TaskMode.Standby
+
if (commitManager != null) {
debug("Starting commit manager for taskName: %s" format taskName)
-
- commitManager.init()
+ commitManager.init(if (isStandByTask) null else lastTaskCheckpoint)
} else {
debug("Skipping commit manager initialization for taskName: %s" format
taskName)
}
- if (offsetManager != null) {
- val checkpoint = offsetManager.getLastTaskCheckpoint(taskName)
- // Only required for checkpointV2
- if (checkpoint != null && checkpoint.getVersion == 2) {
- val checkpointV2 = checkpoint.asInstanceOf[CheckpointV2]
- // call cleanUp on backup managers in case the container previously
failed during commit
- // before completing this step
-
- // WARNING: cleanUp is NOT optional with blob stores since this is
where we reset the TTL for
- // tracked blobs. if this TTL reset is skipped, some of the blobs
retained by future commits may
- // be deleted in the background by the blob store, leading to data
loss.
- info("Cleaning up stale state from previous run for taskName: %s"
format taskName)
- commitManager.cleanUp(checkpointV2.getCheckpointId,
checkpointV2.getStateCheckpointMarkers)
- }
+ var checkpoint: Checkpoint = lastTaskCheckpoint
+ if (offsetManager != null && isStandByTask) {
Review Comment:
Document _why_ special handling for standby.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -119,13 +119,18 @@ public BlobStoreRestoreManager(TaskModel taskModel,
ExecutorService restoreExecu
@Override
public void init(Checkpoint checkpoint) {
+ init(checkpoint, false);
+ }
+
+ public void init(Checkpoint checkpoint, Boolean getDeletedBlob) {
Review Comment:
Add javadoc to public method.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -254,8 +263,10 @@ static CompletableFuture<Void> restoreStores(String
jobName, String jobId, TaskN
throw new SamzaException(String.format("Error deleting store
directory: %s", storeDir), e);
}
+ // If getDeletedBlobs is enabled - always restore so that we get all the
blobs, including the deleted blobs,
+ // immediately restore it locally and backup to create new checkpoint.
boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName,
dirIndex,
- storeCheckpointDir, storageConfig, dirDiffUtil);
+ storeCheckpointDir, storageConfig, dirDiffUtil) || getDeletedBlobs;
Review Comment:
Move the getDeletedBlob check to beginning to short circuit the rest.
Are you sure this should be short circuiting / overriding all the other
checks in shouldRestore? Not obvious to me. Any scenarios where shouldRestore
returns false but we want to force restore?
##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
}
}
- def initTask {
+ def initTask(lastTaskCheckpoint: Checkpoint) {
Review Comment:
See comment above.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}
+
+ public static List<Future<Void>> initAndRestoreTaskInstances(
Review Comment:
Add Javadoc. Also document behavior w.r.t. deleted blobs.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}
+
+ public static List<Future<Void>> initAndRestoreTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
SamzaContainerMetrics samzaContainerMetrics,
+ CheckpointManager checkpointManager, JobContext jobContext,
ContainerModel containerModel,
+ Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String,
Set<String>>> taskBackendFactoryToStoreNames,
+ Config config, ExecutorService executor, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+ Set<String> forceRestoreTasks = new HashSet<>();
+ // Initialize each TaskStorageManager.
+ taskRestoreManagers.forEach((taskName, restoreManagers) ->
+ restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+ try {
+ taskRestoreManager.init(taskCheckpoints.get(taskName));
+ } catch (SamzaException ex) {
+ if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // Get deleted SnapshotIndex blob with GetDeleted and mark the
task to be restored with GetDeleted as well.
+ // this ensures that the restore downloads the snapshot,
recreates a new snapshot, uploads it to blob store
+ // and creates a new checkpoint.
+ ((BlobStoreRestoreManager)
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
Review Comment:
Can you confirm init is safe to be called twice? Also document that this can
happen in BSRM init javadoc.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -208,9 +205,9 @@ public ContainerStorageManager(
}
- public void start() throws SamzaException, InterruptedException {
+ public Map<TaskName, Checkpoint> start() throws SamzaException,
InterruptedException {
Review Comment:
Same for later methods.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]