shekhars-li commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1286390522
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -301,64 +300,14 @@ private void restoreStores() throws InterruptedException {
samzaContainerMetrics, taskInstanceMetrics,
taskInstanceCollectors, serdes,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config,
clock);
taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
+ taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
});
- // Initialize each TaskStorageManager
- taskRestoreManagers.forEach((taskName, restoreManagers) ->
- restoreManagers.forEach((factoryName, taskRestoreManager) ->
- taskRestoreManager.init(taskCheckpoints.get(taskName))
- )
- );
-
- // Start each store consumer once.
- // Note: These consumers are per system and only changelog system store
consumers will be started.
- // Some TaskRestoreManagers may not require the consumer to to be started,
but due to the agnostic nature of
- // ContainerStorageManager we always start the changelog consumer here in
case it is required
-
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
-
- List<Future<Void>> taskRestoreFutures = new ArrayList<>();
-
- // Submit restore callable for each taskInstance
- taskRestoreManagers.forEach((taskInstance, restoreManagersMap) -> {
- // Submit for each restore factory
- restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
- long startTime = System.currentTimeMillis();
- String taskName = taskInstance.getTaskName();
- LOG.info("Starting restore for state for task: {}", taskName);
- CompletableFuture<Void> restoreFuture =
taskRestoreManager.restore().handle((res, ex) -> {
- // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
- // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
- try {
- taskRestoreManager.close();
- } catch (Exception e) {
- LOG.error("Error closing restore manager for task: {} after {}
restore",
- taskName, ex != null ? "unsuccessful" : "successful", e);
- // ignore exception from close. container may still be be able to
continue processing/backups
- // if restore manager close fails.
- }
-
- long timeToRestore = System.currentTimeMillis() - startTime;
- if (samzaContainerMetrics != null) {
- Gauge taskGauge =
samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance,
null);
-
- if (taskGauge != null) {
- taskGauge.set(timeToRestore);
- }
- }
-
- if (ex != null) {
- // log and rethrow exception to communicate restore failure
- String msg = String.format("Error restoring state for task: %s",
taskName);
- LOG.error(msg, ex);
- throw new SamzaException(msg, ex); // wrap in unchecked exception
to throw from lambda
- } else {
- return null;
- }
- });
-
- taskRestoreFutures.add(restoreFuture);
- });
- });
+ // Init all taskRestores and if successful, create a future for restores
for each task
+ List<Future<Void>> taskRestoreFutures =
Review Comment:
Updated to now return a map of task name and checkpoints rather than
mutating the input
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]