[
https://issues.apache.org/jira/browse/SAMZA-157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13935257#comment-13935257
]
Chris Riccomini commented on SAMZA-157:
---------------------------------------
Second issue I've found is:
{noformat}
2014-03-13 23:45:26 KafkaCheckpointManager [INFO] Got offset 0 for topic
__samza_checkpoint_treeid-bloom-filter_i001 and partition 2. Attempting to
fetch message.
2014-03-13 23:45:26 KafkaCheckpointManager [INFO] Got offset 0 (no messages in
state topic) for topic __samza_checkpoint_treeid-bloom-filter_i001 and
partition Partition [partition=2], so returning null. If you expected the state
topic to have messages, you're probably going to lose data.
2014-03-13 23:45:26 SamzaContainer [ERROR] Caught exception in process loop.
java.lang.NullPointerException
at
org.apache.samza.checkpoint.OffsetManager.org$apache$samza$checkpoint$OffsetManager$$restoreOffsetsFromCheckpoint(OffsetManager.scala:270)
at
org.apache.samza.checkpoint.OffsetManager$$anonfun$loadOffsetsFromCheckpointManager$2.apply(OffsetManager.scala:256)
at
org.apache.samza.checkpoint.OffsetManager$$anonfun$loadOffsetsFromCheckpointManager$2.apply(OffsetManager.scala:256)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.samza.checkpoint.OffsetManager.loadOffsetsFromCheckpointManager(OffsetManager.scala:256)
at
org.apache.samza.checkpoint.OffsetManager.start(OffsetManager.scala:164)
at
org.apache.samza.container.SamzaContainer.startOffsetManager(SamzaContainer.scala:558)
at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:492)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
{noformat}
This is easy to reproduce in a unit test just by returning a null checkpoint
from a checkpoint manager.
> Offset default behavior for streams
> -----------------------------------
>
> Key: SAMZA-157
> URL: https://issues.apache.org/jira/browse/SAMZA-157
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.7.0
>
> Attachments: SAMZA-157.0.patch, SAMZA-157.1.patch, SAMZA-157.3.patch,
> SAMZA-157.4.patch
>
>
> Introduce a systems.<system name>.streams.<stream name>.samza.offset.default
> configuration, which specifies what to do when no checkpoint exists for an
> input topic. This is a similar to setting to Kafka's auto.offset.reset
> setting. Developers will be able to specify "oldest", "latest", or "fail".
> we should also add the ability to override offsets for specific stream
> partitions. Something like:
> {noformat}
> systems.<system name>.streams.<stream
> name>.samza.force.offsets=0:123,1:123,2:123
> {noformat}
> The format I'm proposing is:
> {noformat}
> <partition string>:<force offset>,...
> {noformat}
> This is obviously dependent on offsets not having ':' or ',' in them, which I
> think is a safe assumption.
> This setting would force the system consumer to be registered with the
> specified offset for the given SSP (ignoring both the checkpoint, if it
> exists, and the samza.reset.offset setting).
--
This message was sent by Atlassian JIRA
(v6.2#6252)