When you restart the application, are you resuming it from Samza's last commit?
Since the exception is thrown after the GBK, all the data could be read from Kafka and forwarded to the GBK operator inside of Samza and checkpointed in Kafka before the exception is ever thrown. On Tue, Jul 9, 2019 at 8:34 PM Benenson, Mikhail < mikhail_benen...@intuit.com> wrote: > Hi > > > > I have run a few experiments to verify if 'at least once' processing is > guarantee on Beam 2.13.0 with Samza Runner 1.1.0 > > > > Beam application is a slightly modified Stream Word Count from Beam > examples: > > - read strings from input Kafka topic, print (topic, partition, > offset, value) > - convert values to pairs (value, 1) > - grouping in Fixed Windows with duration 30 sec > - sum per key > - throw exception, if key starts with 'm' > - write (key, sum) to output Kafka topic > > > > Tried KafkaIO.read() with and without commitOffsetsInFinalize() there is > no difference in results. > > > > Please, see src code attached. > > > > Environment: > > - Run with local zk & kafka, pre-create input & output topics with 1 > partition. > - samza.properties contains "task.commit.ms=2000". According to samza > doc "this property determines how often a checkpoint is written. The value > is the time between checkpoints, in milliseconds". See > > http://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#checkpointing. > Please, see samza config file and run script attached. > > > > > > *Scenario 1: Exception in transformation* > > > > Run > > - Write 'a', 'b', 'c', 'm', 'd', 'e' into input topic > - start Beam app > - verify, that app log contains "read from topic=XXX, part=0, > offset=100, val: e". Because input topic has only one partition, this means > all data have been read from Kafka. > - wait, until app terminates, because of the exception, while > processing 'm' > > > > Expectation > > The order of processing after grouping is not specified, so some data > could be written to output topic before application terminates, but I > expect that value=m with offset 98 and all later records must NOT be marked > as processed, so if I restart Beam app, I expect it again throws the > exception when processing value=m. > > Comment: throwing exception in transformation is not a good idea, but such > exception could be the result of application error. So, expectation is that > after fixing the error, and restarting Beam app, it should process the > record that cause an error. > > > > Results > > After I restarted app, it does NOT re-processing value m and does not > throws an exception. If I add new value 'f' into input topic, I see "read > from topic=XXX, part=0, offset=101, val: f", and after some time I see 'm' > in the output topic. So, the record with value 'm' is NOT processed. > > > > > > *Scenario 2: App termination* > > > > Run > > - Write 'g', 'h', 'i', 'j' into input topic > - start Beam app > - verify, that app log contains "read from topic=XXX, part=0, > offset=105, val: j". Because input topic has only one partition, this means > that all data has been read from Kafka. > - wait about 10 sec, then terminate Beam app. The idea is to terminate > app, when, ''g', 'h', 'i', 'j' are waiting in the 30 sec Fixed Windows, but > after task.commit.ms=2000 pass, so offsets are committed. > > > > Expectation > > As records 'g', 'h', 'i', 'j' are NOT processed, I expect that after app > restarted, it again reads ‘g’, ‘h’, ‘I’, ‘j’ from input topic and process > these records. > > > > Results > > After I restarted app, it does NOT re-process ‘g’, ‘h’, ‘I’, ‘j’ values. > If I add new value ‘k’ into input topic, I see “read from topic=XXX, > part=0, offset=106, val: k”, and after some time I see ‘k’ in the output > topic. So, the records with values ‘g’, ‘h’, ‘I’, ‘j’ are NOT processed. > > > > > > Based on these results I’m incline to conclude that Beam with Samza runner > does NOT provides 'at least once' guarantee for processing. > > > > If I missed something? > > > > ------------------ > > Michael Benenson > > > > > > *From: *"LeVeck, Matt" <matt_lev...@intuit.com> > *Date: *Monday, July 1, 2019 at 5:28 PM > *To: *"Deshpande, Omkar" <omkar_deshpa...@intuit.com>, "Benenson, > Mikhail" <mikhail_benen...@intuit.com>, Xinyu Liu <xinyuliu...@gmail.com>, > Xinyu Liu <xi...@linkedin.com>, Samarth Shetty <sshe...@linkedin.com>, > "Audo, Nicholas" <nicholas_a...@intuit.com> > *Subject: *Beam/Samza Ensuring At Least Once semantics > > > > We’re seeing some behavior when using Beam’s KafkaIO and Samza as the > runner that suggests checkpoints are getting committed even when an error > gets throwing in the Beam Pipline while processing a batch. Do you all > have a recommended set of settings/patterns for using Beam with Samza to > ensure that checkpoints are only updated after successful processing (i.e. > the transforms succeed and the message is sent to the Beam pipeline’s final > output sink)? > > > > Our current settings for Samza are: > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > > task.checkpoint.system=kafka > > task.shutdown.ms=10000 > > task.commit.ms=2000 > > Nothing is specified with regards to checkpointing at the Beam level. > > > > Thanks, > > Matt > > > > >