That seems to be an issue with how the commit is being restarted in Samza
and not with the Kafka source.

On Thu, Jul 11, 2019 at 4:44 PM Deshpande, Omkar <omkar_deshpa...@intuit.com>
wrote:

> Yes, we are resuming from samza’s last commit. But the problem is that the
> last commit was done for data in the window that is not completely
> processed.
>
>
>
> *From: *Lukasz Cwik <lc...@google.com>
> *Date: *Wednesday, July 10, 2019 at 11:07 AM
> *To: *dev <d...@beam.apache.org>
> *Cc: *"LeVeck, Matt" <matt_lev...@intuit.com>, "Deshpande, Omkar" <
> omkar_deshpa...@intuit.com>, Xinyu Liu <xinyuliu...@gmail.com>, Xinyu Liu
> <xi...@linkedin.com>, Samarth Shetty <sshe...@linkedin.com>, "Audo,
> Nicholas" <nicholas_a...@intuit.com>, "Cesar, Scott" <
> scott_ce...@intuit.com>, "Ho, Tom" <tom...@intuit.com>, "
> dev@samza.apache.org" <dev@samza.apache.org>
> *Subject: *Re: Beam/Samza Ensuring At Least Once semantics
>
>
>
> This email is from an external sender.
>
>
>
> When you restart the application, are you resuming it from Samza's last
> commit?
>
>
>
> Since the exception is thrown after the GBK, all the data could be read
> from Kafka and forwarded to the GBK operator inside of Samza and
> checkpointed in Kafka before the exception is ever thrown.
>
>
>
> On Tue, Jul 9, 2019 at 8:34 PM Benenson, Mikhail <
> mikhail_benen...@intuit.com> wrote:
>
> Hi
>
>
>
> I have run a few experiments to verify if 'at least once' processing is
> guarantee on Beam 2.13.0 with Samza Runner 1.1.0
>
>
>
> Beam application is a slightly modified Stream Word Count from Beam
> examples:
>
>    - read strings from input Kafka topic, print (topic, partition,
>    offset, value)
>    - convert values to pairs (value, 1)
>    - grouping in Fixed Windows with duration 30 sec
>    - sum per key
>    - throw exception, if key starts with 'm'
>    - write (key, sum) to output Kafka topic
>
>
>
> Tried KafkaIO.read() with and without commitOffsetsInFinalize() there is
> no difference in results.
>
>
>
> Please, see src code attached.
>
>
>
> Environment:
>
>    - Run with local zk & kafka, pre-create input & output topics with 1
>    partition.
>    - samza.properties contains "task.commit.ms=2000". According to samza
>    doc "this property determines how often a checkpoint is written. The value
>    is the time between checkpoints, in milliseconds". See
>    
> http://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#checkpointing.
>    Please, see samza config file and run script attached.
>
>
>
>
>
> *Scenario 1: Exception in transformation*
>
>
>
> Run
>
>    - Write 'a', 'b', 'c', 'm', 'd', 'e' into input topic
>    - start Beam app
>    - verify, that app log contains "read from topic=XXX, part=0,
>    offset=100, val: e". Because input topic has only one partition, this means
>    all data have been read from Kafka.
>    - wait, until app terminates, because of the exception, while
>    processing 'm'
>
>
>
> Expectation
>
> The order of processing after grouping is not specified, so some data
> could be written to output topic before application terminates, but I
> expect that value=m with offset 98 and all later records must NOT be marked
> as processed, so if I restart Beam app, I expect it again throws the
> exception when processing value=m.
>
> Comment: throwing exception in transformation is not a good idea, but such
> exception could be the result of application error. So, expectation is that
> after fixing the error, and restarting Beam app, it should process the
> record that cause an error.
>
>
>
> Results
>
> After I restarted app, it does NOT re-processing value m and does not
> throws an exception. If I add new value 'f' into input topic, I see  "read
> from topic=XXX, part=0, offset=101, val: f", and after some time I see 'm'
> in the output topic. So, the record with value 'm' is NOT processed.
>
>
>
>
>
> *Scenario 2: App termination*
>
>
>
> Run
>
>    - Write 'g', 'h', 'i', 'j' into input topic
>    - start Beam app
>    - verify, that app log contains "read from topic=XXX, part=0,
>    offset=105, val: j". Because input topic has only one partition, this means
>    that all data has been read from Kafka.
>    - wait about 10 sec, then terminate Beam app. The idea is to terminate
>    app, when, ''g', 'h', 'i', 'j' are waiting in the 30 sec Fixed Windows, but
>    after  task.commit.ms=2000 pass, so offsets are committed.
>
>
>
> Expectation
>
> As records 'g', 'h', 'i', 'j'  are NOT processed, I expect that after app
> restarted, it again reads ‘g’, ‘h’, ‘I’, ‘j’ from input topic and process
> these records.
>
>
>
> Results
>
> After I restarted app, it does NOT re-process  ‘g’, ‘h’, ‘I’, ‘j’ values.
> If I add new value ‘k’ into input topic, I see  “read from topic=XXX,
> part=0, offset=106, val: k”, and after some time I see ‘k’ in the output
> topic. So, the records with values ‘g’, ‘h’, ‘I’, ‘j’ are NOT processed.
>
>
>
>
>
> Based on these results I’m incline to conclude that Beam with Samza runner
> does NOT provides 'at least once' guarantee for processing.
>
>
>
> If I missed something?
>
>
>
> ------------------
>
> Michael Benenson
>
>
>
>
>
> *From: *"LeVeck, Matt" <matt_lev...@intuit.com>
> *Date: *Monday, July 1, 2019 at 5:28 PM
> *To: *"Deshpande, Omkar" <omkar_deshpa...@intuit.com>, "Benenson,
> Mikhail" <mikhail_benen...@intuit.com>, Xinyu Liu <xinyuliu...@gmail.com>,
> Xinyu Liu <xi...@linkedin.com>, Samarth Shetty <sshe...@linkedin.com>,
> "Audo, Nicholas" <nicholas_a...@intuit.com>
> *Subject: *Beam/Samza Ensuring At Least Once semantics
>
>
>
> We’re seeing some behavior when using Beam’s KafkaIO and Samza as the
> runner that suggests checkpoints are getting committed even when an error
> gets throwing in the Beam Pipline while processing a batch.  Do you all
> have a recommended set of settings/patterns for using Beam with Samza to
> ensure that checkpoints are only updated after successful processing (i.e.
> the transforms succeed and the message is sent to the Beam pipeline’s final
> output sink)?
>
>
>
> Our current settings for Samza are:
>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>
> task.checkpoint.system=kafka
>
> task.shutdown.ms=10000
>
> task.commit.ms=2000
>
> Nothing is specified with regards to checkpointing at the Beam level.
>
>
>
> Thanks,
>
> Matt
>
>
>
>
>

Reply via email to