That seems to be an issue with how the commit is being restarted in Samza and not with the Kafka source.
On Thu, Jul 11, 2019 at 4:44 PM Deshpande, Omkar <omkar_deshpa...@intuit.com> wrote: > Yes, we are resuming from samza’s last commit. But the problem is that the > last commit was done for data in the window that is not completely > processed. > > > > *From: *Lukasz Cwik <lc...@google.com> > *Date: *Wednesday, July 10, 2019 at 11:07 AM > *To: *dev <d...@beam.apache.org> > *Cc: *"LeVeck, Matt" <matt_lev...@intuit.com>, "Deshpande, Omkar" < > omkar_deshpa...@intuit.com>, Xinyu Liu <xinyuliu...@gmail.com>, Xinyu Liu > <xi...@linkedin.com>, Samarth Shetty <sshe...@linkedin.com>, "Audo, > Nicholas" <nicholas_a...@intuit.com>, "Cesar, Scott" < > scott_ce...@intuit.com>, "Ho, Tom" <tom...@intuit.com>, " > dev@samza.apache.org" <dev@samza.apache.org> > *Subject: *Re: Beam/Samza Ensuring At Least Once semantics > > > > This email is from an external sender. > > > > When you restart the application, are you resuming it from Samza's last > commit? > > > > Since the exception is thrown after the GBK, all the data could be read > from Kafka and forwarded to the GBK operator inside of Samza and > checkpointed in Kafka before the exception is ever thrown. > > > > On Tue, Jul 9, 2019 at 8:34 PM Benenson, Mikhail < > mikhail_benen...@intuit.com> wrote: > > Hi > > > > I have run a few experiments to verify if 'at least once' processing is > guarantee on Beam 2.13.0 with Samza Runner 1.1.0 > > > > Beam application is a slightly modified Stream Word Count from Beam > examples: > > - read strings from input Kafka topic, print (topic, partition, > offset, value) > - convert values to pairs (value, 1) > - grouping in Fixed Windows with duration 30 sec > - sum per key > - throw exception, if key starts with 'm' > - write (key, sum) to output Kafka topic > > > > Tried KafkaIO.read() with and without commitOffsetsInFinalize() there is > no difference in results. > > > > Please, see src code attached. > > > > Environment: > > - Run with local zk & kafka, pre-create input & output topics with 1 > partition. > - samza.properties contains "task.commit.ms=2000". According to samza > doc "this property determines how often a checkpoint is written. The value > is the time between checkpoints, in milliseconds". See > > http://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#checkpointing. > Please, see samza config file and run script attached. > > > > > > *Scenario 1: Exception in transformation* > > > > Run > > - Write 'a', 'b', 'c', 'm', 'd', 'e' into input topic > - start Beam app > - verify, that app log contains "read from topic=XXX, part=0, > offset=100, val: e". Because input topic has only one partition, this means > all data have been read from Kafka. > - wait, until app terminates, because of the exception, while > processing 'm' > > > > Expectation > > The order of processing after grouping is not specified, so some data > could be written to output topic before application terminates, but I > expect that value=m with offset 98 and all later records must NOT be marked > as processed, so if I restart Beam app, I expect it again throws the > exception when processing value=m. > > Comment: throwing exception in transformation is not a good idea, but such > exception could be the result of application error. So, expectation is that > after fixing the error, and restarting Beam app, it should process the > record that cause an error. > > > > Results > > After I restarted app, it does NOT re-processing value m and does not > throws an exception. If I add new value 'f' into input topic, I see "read > from topic=XXX, part=0, offset=101, val: f", and after some time I see 'm' > in the output topic. So, the record with value 'm' is NOT processed. > > > > > > *Scenario 2: App termination* > > > > Run > > - Write 'g', 'h', 'i', 'j' into input topic > - start Beam app > - verify, that app log contains "read from topic=XXX, part=0, > offset=105, val: j". Because input topic has only one partition, this means > that all data has been read from Kafka. > - wait about 10 sec, then terminate Beam app. The idea is to terminate > app, when, ''g', 'h', 'i', 'j' are waiting in the 30 sec Fixed Windows, but > after task.commit.ms=2000 pass, so offsets are committed. > > > > Expectation > > As records 'g', 'h', 'i', 'j' are NOT processed, I expect that after app > restarted, it again reads ‘g’, ‘h’, ‘I’, ‘j’ from input topic and process > these records. > > > > Results > > After I restarted app, it does NOT re-process ‘g’, ‘h’, ‘I’, ‘j’ values. > If I add new value ‘k’ into input topic, I see “read from topic=XXX, > part=0, offset=106, val: k”, and after some time I see ‘k’ in the output > topic. So, the records with values ‘g’, ‘h’, ‘I’, ‘j’ are NOT processed. > > > > > > Based on these results I’m incline to conclude that Beam with Samza runner > does NOT provides 'at least once' guarantee for processing. > > > > If I missed something? > > > > ------------------ > > Michael Benenson > > > > > > *From: *"LeVeck, Matt" <matt_lev...@intuit.com> > *Date: *Monday, July 1, 2019 at 5:28 PM > *To: *"Deshpande, Omkar" <omkar_deshpa...@intuit.com>, "Benenson, > Mikhail" <mikhail_benen...@intuit.com>, Xinyu Liu <xinyuliu...@gmail.com>, > Xinyu Liu <xi...@linkedin.com>, Samarth Shetty <sshe...@linkedin.com>, > "Audo, Nicholas" <nicholas_a...@intuit.com> > *Subject: *Beam/Samza Ensuring At Least Once semantics > > > > We’re seeing some behavior when using Beam’s KafkaIO and Samza as the > runner that suggests checkpoints are getting committed even when an error > gets throwing in the Beam Pipline while processing a batch. Do you all > have a recommended set of settings/patterns for using Beam with Samza to > ensure that checkpoints are only updated after successful processing (i.e. > the transforms succeed and the message is sent to the Beam pipeline’s final > output sink)? > > > > Our current settings for Samza are: > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > > task.checkpoint.system=kafka > > task.shutdown.ms=10000 > > task.commit.ms=2000 > > Nothing is specified with regards to checkpointing at the Beam level. > > > > Thanks, > > Matt > > > > >