cameronlee314 commented on a change in pull request #1027: SAMZA-2046: 
Startpoint fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r283537909
 
 

 ##########
 File path: 
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
 ##########
 @@ -356,20 +356,13 @@ class OffsetManager(
 
     // delete corresponding startpoints after checkpoint is supposed to be 
committed
     if (startpointManager != null && startpoints.contains(taskName)) {
-      val sspStartpoints = checkpoint.getOffsets.keySet.asScala
-        .intersect(startpoints.getOrElse(taskName, 
Map.empty[SystemStreamPartition, Startpoint]).keySet)
-
-      // delete startpoints for this task and the intersection of SSPs between 
checkpoint and startpoint.
-      sspStartpoints.foreach(ssp => {
-        startpointManager.deleteStartpoint(ssp, taskName)
-        info("Deleted startpoint for SSP: %s and task: %s" format (ssp, 
taskName))
-      })
+      info("%d startpoint(s) for taskName: %s have been committed to the 
checkpoint." format (startpoints.get(taskName).size, taskName.getTaskName))
+      startpointManager.removeFanOutForTask(taskName)
       startpoints -= taskName
 
       if (startpoints.isEmpty) {
-        // Stop startpoint manager after last startpoint is deleted
-        startpointManager.stop()
-        info("No more startpoints left to consume. Stopped the startpoint 
manager.")
+        info("All outstanding startpoints have been committed to the 
checkpoint.")
+        startpointManager.stop
 
 Review comment:
   This would have been better as a comment on the previous PR:
   Should the `StartpointManager.stop` javadoc be updated to make sure it is 
idempotent (in case changes are made in the future)?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to