Anton Lytvynenko created BEAM-6991:
--------------------------------------

             Summary: EOS: Streaming job fails on job restart with withEOS 
specified
                 Key: BEAM-6991
                 URL: https://issues.apache.org/jira/browse/BEAM-6991
             Project: Beam
          Issue Type: Bug
          Components: runner-dataflow, runner-direct
    Affects Versions: 2.11.0, 2.9.0
            Reporter: Anton Lytvynenko


According to the 
[documentation|https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhatarePIDsandsequencenumbersandhowaretheyrelatedto%60transactional.id%60?],
 the *'transactional.id'* should be the same on producer restart.
In BEAM, the *'transactional.id'* is defined under the hood as follows in 
*org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink.ExactlyOnceWriter#initShardWriter*:

 
{code:java}
String producerName = String.format("producer_%d_for_%s", shard, 
spec.getSinkGroupId());
...
Map<String, Object> producerConfig = new HashMap<>(spec.getProducerConfig());

 producerConfig.putAll(
 ImmutableMap.of(
 ProducerSpEL.ENABLE_IDEMPOTENCE_CONFIG, true,
 ProducerSpEL.TRANSACTIONAL_ID_CONFIG, producerName));
...{code}
 

So to make it consistent (with same value that was used by this writer on 
previous job run) after job restart, I need to configure KafkaIO writer with 
the constant *'sinkGroupId':*

 
{code:java}
.withEOS(numShards, "myWriterSinkGroupId");{code}
 

and restart the job after it was canceled I get the following exception:
{code:java}
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: 
java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there 
is no stored state for it. This mostly indicates groupId 'myWriterSinkGroupId' 
is used else where or in earlier runs. Try another group id. Metadata for this 
shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02 16:05:26"}'
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
 
org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
 
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
 org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
 org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
 org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
 
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
 
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: 
java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there 
is no stored state for it. This mostly indicates groupId 'myWriterSinkGroupId' 
is used else where or in earlier runs. Try another group id. Metadata for this 
shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02 16:05:26"}'
 org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
 Source)
 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:237)
 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
 
org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
 
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
 org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
 org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
 org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
 
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
 
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Kafka metadata exists for shard 0, 
but there is no stored state for it. This mostly indicates groupId 
'myWriterSinkGroupId' is used else where or in earlier runs. Try another group 
id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02 
16:05:26"}'
 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.initShardWriter(KafkaExactlyOnceSink.java:574)
 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:294){code}
That basically says that I need to change '*sinkGroupId'* to something 
different, but if I change it and rerun a job, then I have duplicated messages 
in the destination topic. 
In other words, it breaks the exactly-once semantics messages delivery 
guarantees.

My project uses 2.9.0 beam.version but I tried with 2.11.0 as well, the 
behavior is the same.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to