@Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my experience can help you a bit.
For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement, you need to leverage exactly-once checkpoint/savepoint in Flink. The reason is, with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is read, and once job is restarted KafkaIO reads from last_committed_offset. In my jobs, I enable external(external should be optional I think?) checkpoint on exactly-once mode in Flink cluster. When the job auto-restart on failures it doesn't lost data. In case of manually redeploy the job, I use savepoint to cancel and launch the job. Mingmin On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <rang...@google.com> wrote: > How often does your pipeline checkpoint/snapshot? If the failure happens > before the first checkpoint, the pipeline could restart without any state, > in which case KafkaIO would read from latest offset. There is probably some > way to verify if pipeline is restarting from a checkpoint. > > On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <sushil...@gmail.com> wrote: > >> HI Aljoscha, >> The issue is let's say I consumed 100 elements in 5 >> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all >> those elements. If there is an issue while processing element 70 in >> *ParDo *and the pipeline restarts with *UserCodeException *it's skipping >> the rest 30 elements. Wanted to know if this is expected? In case if you >> still having doubt let me know will share a code snippet. >> >> Regards, >> Sushil Ks >> > > -- ---- Mingmin