[
https://issues.apache.org/jira/browse/BEAM-7047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mark Norkin updated BEAM-7047:
------------------------------
Attachment: beam-direct-runner-not-finalize-all-kafka-offsets-master.zip
> Direct Runner loses checkpoint marks when committing offsets to Apache Kafka
> partitions using KafkaIO
> -----------------------------------------------------------------------------------------------------
>
> Key: BEAM-7047
> URL: https://issues.apache.org/jira/browse/BEAM-7047
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka, runner-direct
> Affects Versions: 2.9.0
> Reporter: Mark Norkin
> Priority: Major
> Attachments:
> beam-direct-runner-not-finalize-all-kafka-offsets-master.zip
>
>
> On our project we are using Apache Kafka as a source for most of our Apache
> Beam pipelines.
> We would like to leverage manual offset commit functionality implemented by
> KafkaIO, and enabled by _commitOffsetsInFinalize_ option.
> We also written several tests that sort of represent and document this
> functionality, and should run during our CI process using Direct Runner.
> However we experienced issues during tests implementation, particularly we
> see that on Direct Runner not all checkpoint marks which in case of KafkaIO
> represent partition offsets are committed.
> I've created sample
> [project|[https://github.com/marknorkin/beam-direct-runner-not-finalize-all-kafka-offsets]]
> to showcase the issue.
> The result of this test execution is *_not_* deterministic, when failing the
> exemplary stacktrace is as follows:
> {code:java}
> org.awaitility.core.ConditionTimeoutException: Condition with alias 'sent raw
> messages are read and offsets are committed' didn't complete within 3 minutes
> because lambda expression in
> com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest:
> expected <{raw_topic-0=10, raw_topic-1=10, raw_topic-2=10}> but was
> <{raw_topic-1=10, raw_topic-0=10}>.
> at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:145)
> at
> org.awaitility.core.AbstractHamcrestCondition.await(AbstractHamcrestCondition.java:89)
> at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:902)
> at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:645)
> at
> com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest.shouldTestOffsetCommit(ParserEndToEndFlowCommitOffsetsTest.java:138)
> {code}
>
> This issue is probably not specific to KafkaIO, as it Direct Runner when
> finalizing Checkpoint Marks works within general CheckpointMark interface.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)