Mark Norkin created BEAM-7047:
---------------------------------
Summary: Direct Runner loses checkpoint marks when committing
offsets to Apache Kafka partitions using KafkaIO
Key: BEAM-7047
URL: https://issues.apache.org/jira/browse/BEAM-7047
Project: Beam
Issue Type: Bug
Components: io-java-kafka, runner-direct
Affects Versions: 2.9.0
Reporter: Mark Norkin
On our project we are using Apache Kafka as a source for most of our Apache
Beam pipelines.
We would like to leverage manual offset commit functionality implemented by
KafkaIO, and enabled by _commitOffsetsInFinalize_ option.
We also written several tests that sort of represent and document this
functionality, and should run during our CI process using Direct Runner.
However we experienced issues during tests implementation, particularly we see
that on Direct Runner not all checkpoint marks which in case of KafkaIO
represent partition offsets are committed.
I've created sample
[project|[https://github.com/marknorkin/beam-direct-runner-not-finalize-all-kafka-offsets]]
to showcase the issue.
The result of this test execution is *_not_* deterministic, when failing the
exemplary stacktrace is as follows:
{code:java}
org.awaitility.core.ConditionTimeoutException: Condition with alias 'sent raw
messages are read and offsets are committed' didn't complete within 3 minutes
because lambda expression in
com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest:
expected <{raw_topic-0=10, raw_topic-1=10, raw_topic-2=10}> but was
<{raw_topic-1=10, raw_topic-0=10}>.
at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:145)
at
org.awaitility.core.AbstractHamcrestCondition.await(AbstractHamcrestCondition.java:89)
at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:902)
at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:645)
at
com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest.shouldTestOffsetCommit(ParserEndToEndFlowCommitOffsetsTest.java:138)
{code}
This issue is probably not specific to KafkaIO, as it Direct Runner when
finalizing Checkpoint Marks works within general CheckpointMark interface.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)