shekhars-li commented on code in PR #1682:
URL: https://github.com/apache/samza/pull/1682#discussion_r1297734327


##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -319,6 +319,19 @@ private Map<TaskName, Checkpoint> restoreStores() throws 
InterruptedException {
       Map<TaskName, Checkpoint> newTaskCheckpoints = 
initRestoreAndNewCheckpointFuture.get();
       taskCheckpoints.putAll(newTaskCheckpoints);
       LOG.debug("Post init and restore checkpoints is: {}. NewTaskCheckpoints 
are: {}", taskCheckpoints, newTaskCheckpoints);
+      // Stop all persistent stores after restoring. Certain persistent stores 
opened in BulkLoad mode are compacted
+      // on stop, so paralleling stop() also parallelizes their compaction (a 
time-intensive operation).

Review Comment:
   You're right. Fixed it by calling taskRestoreManager.close() after each 
taskRestoreManager.restore() completes (with or without expcetion). Also added 
test to verify that the close is not prematurely called. 



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -319,6 +319,19 @@ private Map<TaskName, Checkpoint> restoreStores() throws 
InterruptedException {
       Map<TaskName, Checkpoint> newTaskCheckpoints = 
initRestoreAndNewCheckpointFuture.get();
       taskCheckpoints.putAll(newTaskCheckpoints);
       LOG.debug("Post init and restore checkpoints is: {}. NewTaskCheckpoints 
are: {}", taskCheckpoints, newTaskCheckpoints);
+      // Stop all persistent stores after restoring. Certain persistent stores 
opened in BulkLoad mode are compacted
+      // on stop, so paralleling stop() also parallelizes their compaction (a 
time-intensive operation).

Review Comment:
   Thanks, you're right. Fixed it by calling taskRestoreManager.close() after 
each taskRestoreManager.restore() completes (with or without expcetion). Also 
added test to verify that the close is not prematurely called. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to