Hi, Jeremiah, CheckpointManager is an interface that would allow you to implement where to read/write your checkpoint offsets for each task. The exposed user API to checkpoint in Samza is through TaskCoordinator.commit(). Unfortunately, it does not allow you to commit at the granularity of a certain offset on a specific partition. The design choice we made here is that when user calls commit(), that means the user have processed all messages up to the current one successfully. The recommended pattern is that you will wait for all the current pending messages are processed, then call commit().
Please let me know if you have further doubts. -Yi On Tue, Jun 14, 2016 at 2:13 PM, Jeremiah Adams <jad...@helixeducation.com> wrote: > Thanks, > > I see no commit method in TaskContext. Unless I am missing something it is > TaskCoordinator. TaskCoordinator.commit() also does not look to give me the > ability to set the value of the checkpoint, just checkpoint after > unwrapping the incoming message. I need to treat the messages as if they > were not handled at all when the remote system is unavailable. > > > I have been looking at the CheckpointManager to do this but cannot see how > to wire it into my StreamTask. > > > Jeremiah Adams > Software Engineer > www.helixeducation.com > Blog | Twitter | Facebook | LinkedIn > > ________________________________________ > From: Yi Pan <nickpa...@gmail.com> > Sent: Tuesday, June 14, 2016 2:28 PM > To: dev@samza.apache.org > Subject: Re: Manually Commit Offsets? > > Sorry. Correction: > > > > 2) in your code, call TaskContext.commit() whenever you are ready to > > checkpoint. > > > > > *TaskCoordinator.commit()* > > > > > > > On Tue, Jun 14, 2016 at 10:16 AM, Jeremiah Adams < > > jad...@helixeducation.com> wrote: > > > >> We need to send messages to a remote service. I need to implement a > >> circuit breaker to address the scenario in which the remote system is > >> unavailable. I need to change the current offset to reprocess the > current > >> offset while the remote system is down. These concerns are similar to > those > >> outlined here: https://issues.apache.org/jira/browse/SAMZA-794?< > >> https://issues.apache.org/jira/browse/SAMZA-794> > >> > >> It looks like Samza's Checkpointing mechanism replaces kafka's > >> auto-commit feature and there is no API for manually manipulating the > >> Checkpointing? > >> > >> Can someone point me in the right direction? > >> > >> Thanks in advance. > >> > >> > >> > >> Jeremiah Adams > >> Software Engineer > >> www.helixeducation.com<http://www.helixeducation.com/> > >> Blog<http://www.helixeducation.com/blog/> | Twitter< > >> https://twitter.com/HelixEducation> | Facebook< > >> https://www.facebook.com/HelixEducation> | LinkedIn< > >> http://www.linkedin.com/company/3609946> > >> > > > > >