[ 
https://issues.apache.org/jira/browse/BEAM-7047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Norkin updated BEAM-7047:
------------------------------
    Description: 
On our project we are using Apache Kafka as a source for most of our Apache 
Beam pipelines.

We would like to leverage manual offset commit functionality implemented by 
KafkaIO, and enabled by _commitOffsetsInFinalize_ option.

We also written several tests that sort of represent and document this 
functionality, and should run during our CI process using Direct Runner.

However we experienced issues during tests implementation, particularly we see 
that on Direct Runner  not all checkpoint marks which in case of KafkaIO 
represent partition offsets are committed.

I've created sample project, attached as external link and as src attachment to 
JIRA ticket, to showcase the issue. 

The result of this test execution is *_not_* deterministic, when failing the 
exemplary stacktrace is as follows:
{code:java}
org.awaitility.core.ConditionTimeoutException: Condition with alias 'sent raw 
messages are read and offsets are committed' didn't complete within 3 minutes 
because lambda expression in 
com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest: 
expected <{raw_topic-0=10, raw_topic-1=10, raw_topic-2=10}> but was 
<{raw_topic-1=10, raw_topic-0=10}>.

        at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:145)
        at 
org.awaitility.core.AbstractHamcrestCondition.await(AbstractHamcrestCondition.java:89)
        at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:902)
        at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:645)
        at 
com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest.shouldTestOffsetCommit(ParserEndToEndFlowCommitOffsetsTest.java:138)
{code}
 

This issue is probably not specific to KafkaIO, as it Direct Runner when 
finalizing Checkpoint Marks works within general CheckpointMark interface.

  was:
On our project we are using Apache Kafka as a source for most of our Apache 
Beam pipelines.

We would like to leverage manual offset commit functionality implemented by 
KafkaIO, and enabled by _commitOffsetsInFinalize_ option.

We also written several tests that sort of represent and document this 
functionality, and should run during our CI process using Direct Runner.

However we experienced issues during tests implementation, particularly we see 
that on Direct Runner  not all checkpoint marks which in case of KafkaIO 
represent partition offsets are committed.

I've created sample 
[project|[https://github.com/marknorkin/beam-direct-runner-not-finalize-all-kafka-offsets]]
 to showcase the issue. 

The result of this test execution is *_not_* deterministic, when failing the 
exemplary stacktrace is as follows:
{code:java}
org.awaitility.core.ConditionTimeoutException: Condition with alias 'sent raw 
messages are read and offsets are committed' didn't complete within 3 minutes 
because lambda expression in 
com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest: 
expected <{raw_topic-0=10, raw_topic-1=10, raw_topic-2=10}> but was 
<{raw_topic-1=10, raw_topic-0=10}>.

        at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:145)
        at 
org.awaitility.core.AbstractHamcrestCondition.await(AbstractHamcrestCondition.java:89)
        at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:902)
        at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:645)
        at 
com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest.shouldTestOffsetCommit(ParserEndToEndFlowCommitOffsetsTest.java:138)
{code}
 

This issue is probably not specific to KafkaIO, as it Direct Runner when 
finalizing Checkpoint Marks works within general CheckpointMark interface.


> Direct Runner loses checkpoint marks when committing offsets to Apache Kafka 
> partitions using KafkaIO
> -----------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-7047
>                 URL: https://issues.apache.org/jira/browse/BEAM-7047
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka, runner-direct
>    Affects Versions: 2.9.0
>            Reporter: Mark Norkin
>            Priority: Major
>         Attachments: 
> beam-direct-runner-not-finalize-all-kafka-offsets-master.zip
>
>
> On our project we are using Apache Kafka as a source for most of our Apache 
> Beam pipelines.
> We would like to leverage manual offset commit functionality implemented by 
> KafkaIO, and enabled by _commitOffsetsInFinalize_ option.
> We also written several tests that sort of represent and document this 
> functionality, and should run during our CI process using Direct Runner.
> However we experienced issues during tests implementation, particularly we 
> see that on Direct Runner  not all checkpoint marks which in case of KafkaIO 
> represent partition offsets are committed.
> I've created sample project, attached as external link and as src attachment 
> to JIRA ticket, to showcase the issue. 
> The result of this test execution is *_not_* deterministic, when failing the 
> exemplary stacktrace is as follows:
> {code:java}
> org.awaitility.core.ConditionTimeoutException: Condition with alias 'sent raw 
> messages are read and offsets are committed' didn't complete within 3 minutes 
> because lambda expression in 
> com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest: 
> expected <{raw_topic-0=10, raw_topic-1=10, raw_topic-2=10}> but was 
> <{raw_topic-1=10, raw_topic-0=10}>.
>       at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:145)
>       at 
> org.awaitility.core.AbstractHamcrestCondition.await(AbstractHamcrestCondition.java:89)
>       at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:902)
>       at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:645)
>       at 
> com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest.shouldTestOffsetCommit(ParserEndToEndFlowCommitOffsetsTest.java:138)
> {code}
>  
> This issue is probably not specific to KafkaIO, as it Direct Runner when 
> finalizing Checkpoint Marks works within general CheckpointMark interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to