[
https://issues.apache.org/jira/browse/SAMZA-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14011638#comment-14011638
]
Chris Riccomini commented on SAMZA-267:
---------------------------------------
Regarding which strategy to pick, we always have SAMZA-180 as a fallback. Users
can force their checkpoints if they want to remove outdated streams, or force
offsets.
The most intuitive thing to me seems to be to strip outdated streams from
future checkpoints.
> OffsetManager fails if a checkpointed topic isn't in task.inputs
> ----------------------------------------------------------------
>
> Key: SAMZA-267
> URL: https://issues.apache.org/jira/browse/SAMZA-267
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Fix For: 0.7.0
>
>
> If you run a job with task.inputs=foo, let the job checkpoint, then restart
> it with task.inputs=bar, the last checkpoint will have foo in it. This will
> cause the OffsetManager to fail with:
> {noformat}
> 2014-05-15 12:16:28 SamzaContainer [ERROR] Caught exception in process loop.
> org.apache.samza.SamzaException: Attempting to reset a stream that doesn't
> have offset settings SystemStream [system=kafka, stream=foo].
> at
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1$$anonfun$apply$5.apply(OffsetManager.scala:305)
> at
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1$$anonfun$apply$5.apply(OffsetManager.scala:305)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> at
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1.apply(OffsetManager.scala:305)
> at
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1.apply(OffsetManager.scala:302)
> at
> scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
> at
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:263)
> at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
> at
> org.apache.samza.checkpoint.OffsetManager.getSystemStreamPartitionsToReset(OffsetManager.scala:302)
> at
> org.apache.samza.checkpoint.OffsetManager.stripResetStreams(OffsetManager.scala:287)
> at
> org.apache.samza.checkpoint.OffsetManager.start(OffsetManager.scala:165)
> at
> org.apache.samza.container.SamzaContainer.startOffsetManager(SamzaContainer.scala:558)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:492)
> at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {noformat}
> We should just warn in this case, rather than fail the container.
--
This message was sent by Atlassian JIRA
(v6.2#6252)