prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r594678327



##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -181,27 +176,26 @@ public void persistToLocalFileSystem(Checkpoint 
checkpoint) {
   }
 
   /**
-   * // TODO MED dchen: fix documentation
-   * Cleanup the commit state for each of the task backup managers
+   * Cleanup on the commit state from the {@link #commit(CheckpointId)} call. 
Evokes the cleanup the commit state
+   * for each of the task backup managers. Deletes all the directories of 
checkpoints older than the
+   * latestCheckpointId.
+   *
    * @param latestCheckpointId CheckpointId of the most recent successful 
commit
    * @param stateCheckpointMarkers map of map(stateBackendFactoryName to 
map(storeName to state checkpoint markers) from
-   *                              the latest commit
+   *                               the latest commit
    */
   public void cleanUp(CheckpointId latestCheckpointId, Map<String, Map<String, 
String>> stateCheckpointMarkers) {
     // Call cleanup on each backup manager
     stateCheckpointMarkers.forEach((factoryName, storeSCMs) -> {
       if (stateBackendToBackupManager.containsKey(factoryName)) {
+        LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ", 
storeSCMs, factoryName);
         TaskBackupManager backupManager = 
stateBackendToBackupManager.get(factoryName);
-        if (backupManager != null) {
-          backupManager.cleanUp(latestCheckpointId, storeSCMs);
-        }
-        // TODO HIGH dchen when is it ok for backupmanager to be null? should 
we throw an exception and fail loudly?
+        backupManager.cleanUp(latestCheckpointId, storeSCMs);
       } else {
-        // throw an error and fail instead?
-        LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ", 
storeSCMs, factoryName);
+        throw new SamzaException(String.format("Checkpointed factory %s not 
found or initiated for task name %s",

Review comment:
       Maybe better to keep this as warn. I.e. what happens when we go from 
state backends == (kafka, ambry) to ambry only? Let's document the use case 
when this is acceptable. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to