[ 
https://issues.apache.org/jira/browse/SAMZA-157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13935257#comment-13935257
 ] 

Chris Riccomini commented on SAMZA-157:
---------------------------------------

Second issue I've found is:

{noformat}
2014-03-13 23:45:26 KafkaCheckpointManager [INFO] Got offset 0 for topic 
__samza_checkpoint_treeid-bloom-filter_i001 and partition 2. Attempting to 
fetch message.
2014-03-13 23:45:26 KafkaCheckpointManager [INFO] Got offset 0 (no messages in 
state topic) for topic __samza_checkpoint_treeid-bloom-filter_i001 and 
partition Partition [partition=2], so returning null. If you expected the state 
topic to have messages, you're probably going to lose data.
2014-03-13 23:45:26 SamzaContainer [ERROR] Caught exception in process loop.
java.lang.NullPointerException
        at 
org.apache.samza.checkpoint.OffsetManager.org$apache$samza$checkpoint$OffsetManager$$restoreOffsetsFromCheckpoint(OffsetManager.scala:270)
        at 
org.apache.samza.checkpoint.OffsetManager$$anonfun$loadOffsetsFromCheckpointManager$2.apply(OffsetManager.scala:256)
        at 
org.apache.samza.checkpoint.OffsetManager$$anonfun$loadOffsetsFromCheckpointManager$2.apply(OffsetManager.scala:256)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at 
org.apache.samza.checkpoint.OffsetManager.loadOffsetsFromCheckpointManager(OffsetManager.scala:256)
        at 
org.apache.samza.checkpoint.OffsetManager.start(OffsetManager.scala:164)
        at 
org.apache.samza.container.SamzaContainer.startOffsetManager(SamzaContainer.scala:558)
        at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:492)
        at 
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
{noformat}

This is easy to reproduce in a unit test just by returning a null checkpoint 
from a checkpoint manager.

> Offset default behavior for streams
> -----------------------------------
>
>                 Key: SAMZA-157
>                 URL: https://issues.apache.org/jira/browse/SAMZA-157
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-157.0.patch, SAMZA-157.1.patch, SAMZA-157.3.patch, 
> SAMZA-157.4.patch
>
>
> Introduce a systems.<system name>.streams.<stream name>.samza.offset.default 
> configuration, which specifies what to do when no checkpoint exists for an 
> input topic. This is a similar to setting to Kafka's auto.offset.reset 
> setting. Developers will be able to specify "oldest", "latest", or "fail".
> we should also add the ability to override offsets for specific stream 
> partitions. Something like:
> {noformat}
> systems.<system name>.streams.<stream 
> name>.samza.force.offsets=0:123,1:123,2:123
> {noformat}
> The format I'm proposing is:
> {noformat}
> <partition string>:<force offset>,...
> {noformat}
> This is obviously dependent on offsets not having ':' or ',' in them, which I 
> think is a safe assumption.
> This setting would force the system consumer to be registered with the 
> specified offset for the given SSP (ignoring both the checkpoint, if it 
> exists, and the samza.reset.offset setting).



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to