Anton Lytvynenko created BEAM-6991:
--------------------------------------
Summary: EOS: Streaming job fails on job restart with withEOS
specified
Key: BEAM-6991
URL: https://issues.apache.org/jira/browse/BEAM-6991
Project: Beam
Issue Type: Bug
Components: runner-dataflow, runner-direct
Affects Versions: 2.11.0, 2.9.0
Reporter: Anton Lytvynenko
According to the
[documentation|https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhatarePIDsandsequencenumbersandhowaretheyrelatedto%60transactional.id%60?],
the *'transactional.id'* should be the same on producer restart.
In BEAM, the *'transactional.id'* is defined under the hood as follows in
*org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink.ExactlyOnceWriter#initShardWriter*:
{code:java}
String producerName = String.format("producer_%d_for_%s", shard,
spec.getSinkGroupId());
...
Map<String, Object> producerConfig = new HashMap<>(spec.getProducerConfig());
producerConfig.putAll(
ImmutableMap.of(
ProducerSpEL.ENABLE_IDEMPOTENCE_CONFIG, true,
ProducerSpEL.TRANSACTIONAL_ID_CONFIG, producerName));
...{code}
So to make it consistent (with same value that was used by this writer on
previous job run) after job restart, I need to configure KafkaIO writer with
the constant *'sinkGroupId':*
{code:java}
.withEOS(numShards, "myWriterSinkGroupId");{code}
and restart the job after it was canceled I get the following exception:
{code:java}
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there
is no stored state for it. This mostly indicates groupId 'myWriterSinkGroupId'
is used else where or in earlier runs. Try another group id. Metadata for this
shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02 16:05:26"}'
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there
is no stored state for it. This mostly indicates groupId 'myWriterSinkGroupId'
is used else where or in earlier runs. Try another group id. Metadata for this
shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02 16:05:26"}'
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:237)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Kafka metadata exists for shard 0,
but there is no stored state for it. This mostly indicates groupId
'myWriterSinkGroupId' is used else where or in earlier runs. Try another group
id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02
16:05:26"}'
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.initShardWriter(KafkaExactlyOnceSink.java:574)
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:294){code}
That basically says that I need to change '*sinkGroupId'* to something
different, but if I change it and rerun a job, then I have duplicated messages
in the destination topic.
In other words, it breaks the exactly-once semantics messages delivery
guarantees.
My project uses 2.9.0 beam.version but I tried with 2.11.0 as well, the
behavior is the same.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)