Hi David, Appreciate the feedback. Currently, Samza is single-threaded. The process() callback executes exactly in the context of the same thread that processes events. You can take a process dump (often over a period of time repeatedly) and figure out where the threads are stuck to debug these kind of issues. In general, it is a good idea to timeout any remote calls you make.
We're building multi-threaded support into Samza (SAMZA-863) including asynchronous processing (to support remote calls and async IO). Please have a look at the Jira and provide feedback on the API in the ticket. We can better align the design/implementation to address that. Thanks, Jagadish On Thu, Mar 17, 2016 at 7:56 PM, David Yu <david...@optimizely.com> wrote: > Looks like this has nothing to do with checkpointing. Our samza job has an > issue communicating an external service, which left the particular > process() call waiting indefinitely. And it doesn't look like samza has a > way to timeout a processing cycle. > > On Thu, Mar 17, 2016 at 5:42 PM, David Yu <david...@optimizely.com> wrote: > > > Strangely, I was not able to get checkpoint value for one particular > > partition. Could this cause the job to be stuck? > > > > On Thu, Mar 17, 2016 at 5:23 PM, David Yu <david...@optimizely.com> > wrote: > > > >> Hi, I wanna resurface this thread because I'm still facing issues with > >> our samza not receiving events. > >> > >> Our samza job metric "SamzaContainerMetrics.process-calls" dropped to > >> zero today again. So does "SamzaContainerMetrics.process-envelopes" (of > >> course). Current topic offset and task checkpoint revealed that > everything > >> looks good: > >> > >> Topic partition 18 offset (as of now) = *488986* > >> Current checkpoint for taskname Partition 18: tasknames.Partition > >> 18.systems.kafka.streams.nogoalids.partitions.18 = *474222* > >> > >> Even after redeployment of the job, everything still seemed stuck :( > >> > >> Any ideas that could help me debug this will be appreciated. > >> > >> > >> On Wed, Mar 16, 2016 at 4:19 PM, David Yu <david...@optimizely.com> > >> wrote: > >> > >>> No, instead, I updated the checkpoint topic with the "upcoming" > offsets. > >>> (I should have done a check before that though). > >>> > >>> So a related question: if I delete the checkpoint topic from Kafka, > that > >>> would essentially clear up all the offset info and samza will be able > to > >>> recreate this topic with the latest offsets (e.g. smallest). Is that > >>> correct? Just wanna find an easy way to do a "reprocess all" kind of > >>> operation. > >>> > >>> Thanks. > >>> > >>> On Wed, Mar 16, 2016 at 3:25 PM, Navina Ramesh < > >>> nram...@linkedin.com.invalid> wrote: > >>> > >>>> Strange. I am unable to comment on the behavior because I don't know > >>>> what > >>>> your checkpoints looked like in the checkpoint topic. > >>>> > >>>> Did you try reading the checkpoint topic log ? > >>>> > >>>> If you setting systems.kafka.streams.nogoalids.samza.reset.offset = > >>>> true, > >>>> you are essentially ignoring checkpoints for that stream. Do verify > that > >>>> you are reading from the correct offset in the stream :) > >>>> > >>>> Thanks! > >>>> Navina > >>>> > >>>> On Wed, Mar 16, 2016 at 3:16 PM, David Yu <david...@optimizely.com> > >>>> wrote: > >>>> > >>>> > Finally seeing events flowing again. > >>>> > > >>>> > Yes, the "systems.kafka.consumer.auto.offset.reset" option is > >>>> probably not > >>>> > a factor here. And yes, I am using checkpointing (kafka). Not sure > if > >>>> the > >>>> > offsets are messed up. But I was able to use > >>>> > "systems.kafka.streams.nogoalids.samza.reset.offset=true" to reset > the > >>>> > offsets to the newest ones. After that, events started coming. > Still, > >>>> it is > >>>> > unclear to me how things got stuck in the first place. > >>>> > > >>>> > On Wed, Mar 16, 2016 at 2:31 PM, Navina Ramesh > >>>> > <nram...@linkedin.com.invalid > >>>> > > wrote: > >>>> > > >>>> > > HI David, > >>>> > > This configuration you have tweaked > >>>> > > (systems.kafka.consumer.auto.offset.reset) is honored only when > one > >>>> of > >>>> > the > >>>> > > following condition holds: > >>>> > > * topic doesn't exist > >>>> > > * checkpoint is older than the maximum message history retained by > >>>> the > >>>> > > brokers > >>>> > > > >>>> > > So, my questions are : > >>>> > > Are you using checkpointing? If you do, you can read the > checkpoint > >>>> topic > >>>> > > to see the offset that is being used to fetch data. > >>>> > > > >>>> > > If you are not using checkpoints, then samza uses > >>>> > > systems.kafka.samza.offset.default to decide whether to start > >>>> reading > >>>> > from > >>>> > > the earliest (oldest data) or upcoming (newest data) offset in the > >>>> > stream. > >>>> > > > >>>> > > This could explain from where your job is trying to consume and > you > >>>> can > >>>> > > cross-check with the broker. > >>>> > > For the purpose of debugging, you can print a debug line in > >>>> process() > >>>> > > method to print the offset of the message you are processing > >>>> > > (message.getOffset). Please remember to remove the debug line > after > >>>> > > troubleshooting. Else you risk filling up your logs. > >>>> > > > >>>> > > Let me know if you have more questions. > >>>> > > > >>>> > > Thanks! > >>>> > > Navina > >>>> > > > >>>> > > On Wed, Mar 16, 2016 at 2:12 PM, David Yu < > david...@optimizely.com> > >>>> > wrote: > >>>> > > > >>>> > > > I'm trying to debug our samza job, which seem to be stuck from > >>>> > consuming > >>>> > > > from our Kafka stream. > >>>> > > > > >>>> > > > Every time I redeploy the job, only the same handful of events > get > >>>> > > > consumed, and then no more events get processed. I manually > >>>> checked to > >>>> > > make > >>>> > > > sure the input stream is live and flowing. I also tried both the > >>>> > > following: > >>>> > > > > >>>> > > > systems.kafka.consumer.auto.offset.reset=largest > >>>> > > > systems.kafka.consumer.auto.offset.reset=smallest > >>>> > > > > >>>> > > > I'm also seeing the following from the log: > >>>> > > > > >>>> > > > ... partitionMetadata={Partition > >>>> > > > [partition=0]=SystemStreamPartitionMetadata > [oldestOffset=144907, > >>>> > > > newestOffset=202708, upcomingOffset=202709], Partition > >>>> > > > [partition=5]=SystemStreamPartitionMetadata > [oldestOffset=140618, > >>>> > > > newestOffset=200521, upcomingOffset=200522], ... > >>>> > > > > >>>> > > > > >>>> > > > Not sure what other ways I could diagnose this problem. Any > >>>> suggestion > >>>> > is > >>>> > > > appreciated. > >>>> > > > > >>>> > > > >>>> > > > >>>> > > > >>>> > > -- > >>>> > > Navina R. > >>>> > > > >>>> > > >>>> > >>>> > >>>> > >>>> -- > >>>> Navina R. > >>>> > >>> > >>> > >> > > > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University