[ 
https://issues.apache.org/jira/browse/SAMZA-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14011579#comment-14011579
 ] 

Chris Riccomini commented on SAMZA-267:
---------------------------------------

We need to decide if we want to keep outdated stream offsets in the checkpoint. 
If we keep the outdated stream offsets in the checkpoint, it means that 
developers can switch task.inputs back later, and resume where they left off 
with the old stream. If we strip all outdated (no longer in task.inputs) 
streams out of the checkpoint, then once a new checkpoint is written, switching 
back to a previously-consumed stream will result in using the 
samza.offset.default setting.

I think there are use cases for both behaviors, but I don't want to add 
yet-another-config to control this. The easiest fix is to just strip out all 
outdated streams from systemStreamPartitions and lastProcessedOffsets. This 
would result in stripping all outdated streams from future checkpoints.

The other way to handle this is to fix each individual breakage. There are two 
places in the OffsetManager where breakage is introduced when input streams are 
switched.

# getSystemStreamPartitionsToReset
# loadDefaults

For getSystemStreamPartitionsToReset, we could log and ignore outdated streams. 
For loadDefaults, we could just use the global default (OffsetType.UPCOMING).

> OffsetManager fails if a checkpointed topic isn't in task.inputs
> ----------------------------------------------------------------
>
>                 Key: SAMZA-267
>                 URL: https://issues.apache.org/jira/browse/SAMZA-267
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>             Fix For: 0.7.0
>
>
> If you run a job with task.inputs=foo, let the job checkpoint, then restart 
> it with task.inputs=bar, the last checkpoint will have foo in it. This will 
> cause the OffsetManager to fail with:
> {noformat}
> 2014-05-15 12:16:28 SamzaContainer [ERROR] Caught exception in process loop.
> org.apache.samza.SamzaException: Attempting to reset a stream that doesn't 
> have offset settings SystemStream [system=kafka, stream=foo].
>     at 
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1$$anonfun$apply$5.apply(OffsetManager.scala:305)
>     at 
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1$$anonfun$apply$5.apply(OffsetManager.scala:305)
>     at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>     at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>     at 
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1.apply(OffsetManager.scala:305)
>     at 
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1.apply(OffsetManager.scala:302)
>     at 
> scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>     at 
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:263)
>     at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
>     at 
> org.apache.samza.checkpoint.OffsetManager.getSystemStreamPartitionsToReset(OffsetManager.scala:302)
>     at 
> org.apache.samza.checkpoint.OffsetManager.stripResetStreams(OffsetManager.scala:287)
>     at 
> org.apache.samza.checkpoint.OffsetManager.start(OffsetManager.scala:165)
>     at 
> org.apache.samza.container.SamzaContainer.startOffsetManager(SamzaContainer.scala:558)
>     at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:492)
>     at 
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
>     at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {noformat}
> We should just warn in this case, rather than fail the container.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to