Hi

I have run a few experiments to verify if 'at least once' processing is 
guarantee on Beam 2.13.0 with Samza Runner 1.1.0

Beam application is a slightly modified Stream Word Count from Beam examples:

  *   read strings from input Kafka topic, print (topic, partition, offset, 
value)
  *   convert values to pairs (value, 1)
  *   grouping in Fixed Windows with duration 30 sec
  *   sum per key
  *   throw exception, if key starts with 'm'
  *   write (key, sum) to output Kafka topic

Tried KafkaIO.read() with and without commitOffsetsInFinalize() there is no 
difference in results.

Please, see src code attached.

Environment:

  *   Run with local zk & kafka, pre-create input & output topics with 1 
partition.
  *   samza.properties contains "task.commit.ms=2000". According to samza doc 
"this property determines how often a checkpoint is written. The value is the 
time between checkpoints, in milliseconds". See 
http://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#checkpointing.
 Please, see samza config file and run script attached.


Scenario 1: Exception in transformation

Run

  *   Write 'a', 'b', 'c', 'm', 'd', 'e' into input topic
  *   start Beam app
  *   verify, that app log contains "read from topic=XXX, part=0, offset=100, 
val: e". Because input topic has only one partition, this means all data have 
been read from Kafka.
  *   wait, until app terminates, because of the exception, while processing 'm'

Expectation
The order of processing after grouping is not specified, so some data could be 
written to output topic before application terminates, but I expect that 
value=m with offset 98 and all later records must NOT be marked as processed, 
so if I restart Beam app, I expect it again throws the exception when 
processing value=m.
Comment: throwing exception in transformation is not a good idea, but such 
exception could be the result of application error. So, expectation is that 
after fixing the error, and restarting Beam app, it should process the record 
that cause an error.

Results
After I restarted app, it does NOT re-processing value m and does not throws an 
exception. If I add new value 'f' into input topic, I see  "read from 
topic=XXX, part=0, offset=101, val: f", and after some time I see 'm' in the 
output topic. So, the record with value 'm' is NOT processed.


Scenario 2: App termination

Run

  *   Write 'g', 'h', 'i', 'j' into input topic
  *   start Beam app
  *   verify, that app log contains "read from topic=XXX, part=0, offset=105, 
val: j". Because input topic has only one partition, this means that all data 
has been read from Kafka.
  *   wait about 10 sec, then terminate Beam app. The idea is to terminate app, 
when, ''g', 'h', 'i', 'j' are waiting in the 30 sec Fixed Windows, but after  
task.commit.ms=2000 pass, so offsets are committed.

Expectation
As records 'g', 'h', 'i', 'j'  are NOT processed, I expect that after app 
restarted, it again reads ‘g’, ‘h’, ‘I’, ‘j’ from input topic and process these 
records.

Results
After I restarted app, it does NOT re-process  ‘g’, ‘h’, ‘I’, ‘j’ values. If I 
add new value ‘k’ into input topic, I see  “read from topic=XXX, part=0, 
offset=106, val: k”, and after some time I see ‘k’ in the output topic. So, the 
records with values ‘g’, ‘h’, ‘I’, ‘j’ are NOT processed.


Based on these results I’m incline to conclude that Beam with Samza runner does 
NOT provides 'at least once' guarantee for processing.

If I missed something?

------------------
Michael Benenson


From: "LeVeck, Matt" <matt_lev...@intuit.com>
Date: Monday, July 1, 2019 at 5:28 PM
To: "Deshpande, Omkar" <omkar_deshpa...@intuit.com>, "Benenson, Mikhail" 
<mikhail_benen...@intuit.com>, Xinyu Liu <xinyuliu...@gmail.com>, Xinyu Liu 
<xi...@linkedin.com>, Samarth Shetty <sshe...@linkedin.com>, "Audo, Nicholas" 
<nicholas_a...@intuit.com>
Subject: Beam/Samza Ensuring At Least Once semantics

We’re seeing some behavior when using Beam’s KafkaIO and Samza as the runner 
that suggests checkpoints are getting committed even when an error gets 
throwing in the Beam Pipline while processing a batch.  Do you all have a 
recommended set of settings/patterns for using Beam with Samza to ensure that 
checkpoints are only updated after successful processing (i.e. the transforms 
succeed and the message is sent to the Beam pipeline’s final output sink)?

Our current settings for Samza are:
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
task.shutdown.ms=10000
task.commit.ms=2000

Nothing is specified with regards to checkpointing at the Beam level.

Thanks,
Matt


#!/bin/bash

java  -cp  
"target/data-strmprocess-samza-driver-0.1.0.jar:target/lib:target/lib/*"  \
  com.intuit.strmprocess.once.OnceDemoWordCount01  \
  --runner=SamzaRunner   \
  --jobName=driver-once \
  --jobInstance=001 \
  --maxSourceParallelism=10 \
  --samzaExecutionEnvironment=STANDALONE \
  --configFilePath=src/main/resources/samza-once-local.properties  

Attachment: samza-once-local.properties
Description: samza-once-local.properties

Attachment: OnceDemoWordCount01.java
Description: OnceDemoWordCount01.java

Reply via email to