dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r283565142
##########
File path:
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
##########
@@ -356,20 +356,13 @@ class OffsetManager(
// delete corresponding startpoints after checkpoint is supposed to be
committed
if (startpointManager != null && startpoints.contains(taskName)) {
- val sspStartpoints = checkpoint.getOffsets.keySet.asScala
- .intersect(startpoints.getOrElse(taskName,
Map.empty[SystemStreamPartition, Startpoint]).keySet)
-
- // delete startpoints for this task and the intersection of SSPs between
checkpoint and startpoint.
- sspStartpoints.foreach(ssp => {
- startpointManager.deleteStartpoint(ssp, taskName)
- info("Deleted startpoint for SSP: %s and task: %s" format (ssp,
taskName))
- })
+ info("%d startpoint(s) for taskName: %s have been committed to the
checkpoint." format (startpoints.get(taskName).size, taskName.getTaskName))
+ startpointManager.removeFanOutForTask(taskName)
startpoints -= taskName
if (startpoints.isEmpty) {
- // Stop startpoint manager after last startpoint is deleted
- startpointManager.stop()
- info("No more startpoints left to consume. Stopped the startpoint
manager.")
+ info("All outstanding startpoints have been committed to the
checkpoint.")
+ startpointManager.stop
Review comment:
`.stop` is no longer required if we completely remove the metadata store
creation from the startpoint manager.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services