[ 
https://issues.apache.org/jira/browse/SAMZA-157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13935239#comment-13935239
 ] 

Chris Riccomini commented on SAMZA-157:
---------------------------------------

The first issue that I see is that a job started with offset.default=upcoming 
(the default) will never start. The stack trace is:

{noformat}
2014-03-14 09:36:08 GetOffset [INFO] Validating offset 0 for topic and 
partition [PageViewEventByGroup,0]
2014-03-14 09:36:09 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 
30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 8192 
(requested -1).
2014-03-14 09:36:09 GetOffset [INFO] Able to successfully read from offset 0 
for topic and partition [PageViewEventByGroup,0]. Using it to instantiate 
consumer.
2014-03-14 09:36:09 KafkaSystemConsumer [WARN] An exception was thrown while 
refreshing brokers for [PageViewEventByGroup,0]. Waiting a bit and retrying, 
since we can't continue without broker metadata.
2014-03-14 09:36:09 KafkaSystemConsumer [DEBUG] Exception while refreshing 
brokers
org.apache.samza.SamzaException: Got empty message set for a valid offset. This 
is unexpected.
        at org.apache.samza.system.kafka.Toss$class.toss(Toss.scala:27)
        at org.apache.samza.system.kafka.GetOffset.toss(GetOffset.scala:35)
        at 
org.apache.samza.system.kafka.GetOffset.isValidOffset(GetOffset.scala:70)
        at 
org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(BrokerProxy.scala:96)
        at 
org.apache.samza.system.kafka.KafkaSystemConsumer.refresh$1(KafkaSystemConsumer.scala:125)
        at 
org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers(KafkaSystemConsumer.scala:133)
        at 
org.apache.samza.system.kafka.KafkaSystemConsumer.start(KafkaSystemConsumer.scala:79)
        at 
org.apache.samza.system.SystemConsumers$$anonfun$start$4.apply(SystemConsumers.scala:167)
        at 
org.apache.samza.system.SystemConsumers$$anonfun$start$4.apply(SystemConsumers.scala:167)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at 
org.apache.samza.system.SystemConsumers.start(SystemConsumers.scala:167)
        at 
org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer.scala:590)
        at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:496)
        at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
{noformat}

This is because we're fetching against the upcoming offset. The resulting 
MessageSet is empty, and we then try to check to see if it has messages, and 
fail. This logic doesn't make sense any more, and should be removed.

> Offset default behavior for streams
> -----------------------------------
>
>                 Key: SAMZA-157
>                 URL: https://issues.apache.org/jira/browse/SAMZA-157
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-157.0.patch, SAMZA-157.1.patch, SAMZA-157.3.patch, 
> SAMZA-157.4.patch
>
>
> Introduce a systems.<system name>.streams.<stream name>.samza.offset.default 
> configuration, which specifies what to do when no checkpoint exists for an 
> input topic. This is a similar to setting to Kafka's auto.offset.reset 
> setting. Developers will be able to specify "oldest", "latest", or "fail".
> we should also add the ability to override offsets for specific stream 
> partitions. Something like:
> {noformat}
> systems.<system name>.streams.<stream 
> name>.samza.force.offsets=0:123,1:123,2:123
> {noformat}
> The format I'm proposing is:
> {noformat}
> <partition string>:<force offset>,...
> {noformat}
> This is obviously dependent on offsets not having ':' or ',' in them, which I 
> think is a safe assumption.
> This setting would force the system consumer to be registered with the 
> specified offset for the given SSP (ignoring both the checkpoint, if it 
> exists, and the samza.reset.offset setting).



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to