shekhars-li commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1286275959


##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
     }
   }
 
-  def initTask {
+  def initTask(lastTaskCheckpoint: Checkpoint) {
     initCaughtUpMapping()
 
+    val isStandByTask = taskModel.getTaskMode == TaskMode.Standby
+
     if (commitManager != null) {
       debug("Starting commit manager for taskName: %s" format taskName)
-
-      commitManager.init()
+      commitManager.init(if (isStandByTask) null else lastTaskCheckpoint)
     } else {
       debug("Skipping commit manager initialization for taskName: %s" format 
taskName)
     }
 
-    if (offsetManager != null) {
-      val checkpoint = offsetManager.getLastTaskCheckpoint(taskName)
-      // Only required for checkpointV2
-      if (checkpoint != null && checkpoint.getVersion == 2) {
-        val checkpointV2 = checkpoint.asInstanceOf[CheckpointV2]
-        // call cleanUp on backup managers in case the container previously 
failed during commit
-        // before completing this step
-
-        // WARNING: cleanUp is NOT optional with blob stores since this is 
where we reset the TTL for
-        // tracked blobs. if this TTL reset is skipped, some of the blobs 
retained by future commits may
-        // be deleted in the background by the blob store, leading to data 
loss.
-        info("Cleaning up stale state from previous run for taskName: %s" 
format taskName)
-        commitManager.cleanUp(checkpointV2.getCheckpointId, 
checkpointV2.getStateCheckpointMarkers)
-      }
+    var checkpoint: Checkpoint = lastTaskCheckpoint
+    if (offsetManager != null && isStandByTask) {

Review Comment:
   Added a comment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to