Finally seeing events flowing again. Yes, the "systems.kafka.consumer.auto.offset.reset" option is probably not a factor here. And yes, I am using checkpointing (kafka). Not sure if the offsets are messed up. But I was able to use "systems.kafka.streams.nogoalids.samza.reset.offset=true" to reset the offsets to the newest ones. After that, events started coming. Still, it is unclear to me how things got stuck in the first place.
On Wed, Mar 16, 2016 at 2:31 PM, Navina Ramesh <nram...@linkedin.com.invalid > wrote: > HI David, > This configuration you have tweaked > (systems.kafka.consumer.auto.offset.reset) is honored only when one of the > following condition holds: > * topic doesn't exist > * checkpoint is older than the maximum message history retained by the > brokers > > So, my questions are : > Are you using checkpointing? If you do, you can read the checkpoint topic > to see the offset that is being used to fetch data. > > If you are not using checkpoints, then samza uses > systems.kafka.samza.offset.default to decide whether to start reading from > the earliest (oldest data) or upcoming (newest data) offset in the stream. > > This could explain from where your job is trying to consume and you can > cross-check with the broker. > For the purpose of debugging, you can print a debug line in process() > method to print the offset of the message you are processing > (message.getOffset). Please remember to remove the debug line after > troubleshooting. Else you risk filling up your logs. > > Let me know if you have more questions. > > Thanks! > Navina > > On Wed, Mar 16, 2016 at 2:12 PM, David Yu <david...@optimizely.com> wrote: > > > I'm trying to debug our samza job, which seem to be stuck from consuming > > from our Kafka stream. > > > > Every time I redeploy the job, only the same handful of events get > > consumed, and then no more events get processed. I manually checked to > make > > sure the input stream is live and flowing. I also tried both the > following: > > > > systems.kafka.consumer.auto.offset.reset=largest > > systems.kafka.consumer.auto.offset.reset=smallest > > > > I'm also seeing the following from the log: > > > > ... partitionMetadata={Partition > > [partition=0]=SystemStreamPartitionMetadata [oldestOffset=144907, > > newestOffset=202708, upcomingOffset=202709], Partition > > [partition=5]=SystemStreamPartitionMetadata [oldestOffset=140618, > > newestOffset=200521, upcomingOffset=200522], ... > > > > > > Not sure what other ways I could diagnose this problem. Any suggestion is > > appreciated. > > > > > > -- > Navina R. >