cameronlee314 commented on a change in pull request #1027: SAMZA-2046:
Startpoint fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r283537909
##########
File path:
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
##########
@@ -356,20 +356,13 @@ class OffsetManager(
// delete corresponding startpoints after checkpoint is supposed to be
committed
if (startpointManager != null && startpoints.contains(taskName)) {
- val sspStartpoints = checkpoint.getOffsets.keySet.asScala
- .intersect(startpoints.getOrElse(taskName,
Map.empty[SystemStreamPartition, Startpoint]).keySet)
-
- // delete startpoints for this task and the intersection of SSPs between
checkpoint and startpoint.
- sspStartpoints.foreach(ssp => {
- startpointManager.deleteStartpoint(ssp, taskName)
- info("Deleted startpoint for SSP: %s and task: %s" format (ssp,
taskName))
- })
+ info("%d startpoint(s) for taskName: %s have been committed to the
checkpoint." format (startpoints.get(taskName).size, taskName.getTaskName))
+ startpointManager.removeFanOutForTask(taskName)
startpoints -= taskName
if (startpoints.isEmpty) {
- // Stop startpoint manager after last startpoint is deleted
- startpointManager.stop()
- info("No more startpoints left to consume. Stopped the startpoint
manager.")
+ info("All outstanding startpoints have been committed to the
checkpoint.")
+ startpointManager.stop
Review comment:
This would have been better as a comment on the previous PR:
Should the `StartpointManager.stop` javadoc be updated to make sure it is
idempotent (in case changes are made in the future)?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services