[ 
https://issues.apache.org/jira/browse/SAMZA-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14012457#comment-14012457
 ] 

Martin Kleppmann commented on SAMZA-267:
----------------------------------------

+1 looks good to me.

bq. The shouldKeep junk is ugly. Is there a more idiomatic way to do this in 
Scala?

In Ruby I would use {{#tap}} for this. Scala doesn't seem to have a built-in 
equivalent — looks like people tend to [build their 
own|http://stackoverflow.com/questions/9671620/how-to-keep-return-value-when-logging-in-scala].
 As there doesn't seem to be an agreed idiomatic name for the construct in 
Scala, it's probably clearest to do what you're doing now.


> OffsetManager fails if a checkpointed topic isn't in task.inputs
> ----------------------------------------------------------------
>
>                 Key: SAMZA-267
>                 URL: https://issues.apache.org/jira/browse/SAMZA-267
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-267-0.patch
>
>
> If you run a job with task.inputs=foo, let the job checkpoint, then restart 
> it with task.inputs=bar, the last checkpoint will have foo in it. This will 
> cause the OffsetManager to fail with:
> {noformat}
> 2014-05-15 12:16:28 SamzaContainer [ERROR] Caught exception in process loop.
> org.apache.samza.SamzaException: Attempting to reset a stream that doesn't 
> have offset settings SystemStream [system=kafka, stream=foo].
>     at 
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1$$anonfun$apply$5.apply(OffsetManager.scala:305)
>     at 
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1$$anonfun$apply$5.apply(OffsetManager.scala:305)
>     at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>     at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>     at 
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1.apply(OffsetManager.scala:305)
>     at 
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1.apply(OffsetManager.scala:302)
>     at 
> scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>     at 
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:263)
>     at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
>     at 
> org.apache.samza.checkpoint.OffsetManager.getSystemStreamPartitionsToReset(OffsetManager.scala:302)
>     at 
> org.apache.samza.checkpoint.OffsetManager.stripResetStreams(OffsetManager.scala:287)
>     at 
> org.apache.samza.checkpoint.OffsetManager.start(OffsetManager.scala:165)
>     at 
> org.apache.samza.container.SamzaContainer.startOffsetManager(SamzaContainer.scala:558)
>     at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:492)
>     at 
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
>     at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {noformat}
> We should just warn in this case, rather than fail the container.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to