[ 
https://issues.apache.org/jira/browse/FLINK-24151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17410944#comment-17410944
 ] 

Arvid Heise commented on FLINK-24151:
-------------------------------------

This is a Kafka broker issue, probably the root cause of KAFKA-9310. From what 
I can see, it some kind of concurrency issue where the broker is just not able 
to write to a transaction right after another transaction was committed.

We could reliably reproduce the issue with Kafka broker running on 2.4.1. But 
where unable to see the issue on 2.5.X or 2.7.X.
{noformat}
2021-09-06 16:48:14,620 DEBUG 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer [] - Change 
transaction id from flink-23850-2-6 to flink-23850-2-8
2021-09-06 16:48:14,620 INFO  
org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionManager
 [] - [Producer clientId=producer-flink-23850-2-2, 
transactionalId=flink-23850-2-2] ProducerId set to -1 with epoch -1
2021-09-06 16:48:14,622 INFO  
org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionManager
 [] - [Producer clientId=producer-flink-23850-2-2, 
transactionalId=flink-23850-2-2] ProducerId set to 10041 with epoch 91
2021-09-06 16:48:14,622 INFO  org.apache.flink.connector.kafka.sink.KafkaWriter 
           [] - Created new transactional producer flink-23850-2-8
2021-09-06 16:48:14,629 INFO  
org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler
 [] - Committing the state for checkpoint 7
2021-09-06 16:48:14,629 DEBUG 
org.apache.flink.connector.kafka.sink.KafkaCommitter         [] - Committing 
Kafka transaction flink-23850-2-7
2021-09-06 16:48:14,629 DEBUG 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer [] - 
commitTransaction flink-23850-2-7
...
2021-09-06 16:48:14,980 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: KafkaSource-default_catalog.default_database.T1 -> 
Calc(select=[pk, _UTF-16LE'asd' AS EXPR$1, _UTF-16LE'foo' AS EXPR$2, 
_UTF-16LE'bar' AS EXPR$3]) -> NotNullEnforcer(fields=[pk]) -> Sink 
Sink(table=[default_catalog.default_database.T4], fields=[pk, EXPR$1, EXPR$2, 
EXPR$3]) (3/6)#0 (a32ab17fecae6d2f69bba01bc2c6f21c) switched from RUNNING to 
FAILED with failure cause: org.apache.flink.util.FlinkRuntimeException: Failed 
to send data to Kafka flink-23850-out-0@-1 with 
FlinkKafkaInternalProducer{transactionalId='flink-23850-2-8', 
inTransaction=true, closed=false}
        at 
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:383)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:789)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:741)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownProducerIdException:
 This exception is raised by the broker if it could not locate the producer 
metadata associated with the producerId in question. This could happen if, for 
instance, the producer's records were deleted because their retention time had 
elapsed. Once the last records of the producerId are removed, the producer's 
metadata is removed from the broker, and future appends by the producer will 
return this exception.
{noformat}

The solution is similar to KAFKA-9310: just restart. However, Flink users 
probably experience longer downtime on average because of larger applications 
and state. Hence, we will give directly feedback to upgrade Kafka to 2.5+.

This will be solved as part of FLINK-24131.

> KafkaSink fails with setMaxConcurrentCheckpoints being enabled
> --------------------------------------------------------------
>
>                 Key: FLINK-24151
>                 URL: https://issues.apache.org/jira/browse/FLINK-24151
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>            Reporter: Matthias
>            Assignee: Arvid Heise
>            Priority: Blocker
>             Fix For: 1.14.0
>
>         Attachments: release-testing-run-6.tar.gz
>
>
> We experienced a {{RuntimeException}} in a test run for FLINK-23850 :
> {code}
> java.lang.RuntimeException: Failed to send data to Kafka: This exception is 
> raised by the broker if it could not locate the producer metadata associated 
> with the producerId in question. This could happen if, for instance, the 
> producer's records were deleted because their retention time had elapsed. 
> Once the last records of the producerId are removed, the producer's metadata 
> is removed from the broker, and future appends by the producer will return 
> this exception.
>         at 
> org.apache.flink.connector.kafka.sink.KafkaWriter.checkErroneous(KafkaWriter.java:263)
>  ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:178) 
> ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:161)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at StreamExecCalc$6.processElement(Unknown Source) ~[?:?]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36)
>  ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27)
>  ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:141)
>  ~[flink-table_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:341)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:789)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:741)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
> Caused by: 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownProducerIdException:
>  This exception is raised by the broker if it could not locate the producer 
> metadata associated with the producerId in question. This could happen if, 
> for instance, the producer's records were deleted because their retention 
> time had elapsed. Once the last records of the producerId are removed, the 
> producer's metadata is removed from the broker, and future appends by the 
> producer will return this exception.
> {code}
> Test job executed:
> {code:java}
>         Configuration config = new Configuration();
>         
> config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
>  true);
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(config);
>         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 2000));
>         env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
>         env.setParallelism(6);
>         final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
>         tableEnv.createTable("T1",
>                 TableDescriptor.forConnector("kafka")
>                         .schema(Schema.newBuilder()
>                                 .column("pk", DataTypes.STRING().notNull())
>                                 .column("x", DataTypes.STRING().notNull())
>                                 .build())
>                         .option("topic", "flink-23850-in1")
>                         .option("properties.bootstrap.servers", 
> FLINK23850Utils.BOOTSTRAP_SERVERS)
>                         .option("value.format", "csv")
>                         .option("scan.startup.mode", "earliest-offset")
>                         .build());
>         final Table resultTable =
>                 tableEnv.sqlQuery(
>                         "SELECT "
>                                 + "T1.pk, "
>                                 + "'asd', "
>                                 + "'foo', "
>                                 + "'bar' "
>                                 + "FROM T1");
>         tableEnv.createTable("T4",
>                 TableDescriptor.forConnector("kafka")
>                         .schema(Schema.newBuilder()
>                                 .column("pk", DataTypes.STRING().notNull())
>                                 .column("some_calculated_value", 
> DataTypes.STRING())
>                                 .column("pk1", DataTypes.STRING())
>                                 .column("pk2", DataTypes.STRING())
>                                 .build())
>                         .option("topic", "flink-23850-out")
>                         .option("properties.bootstrap.servers", 
> FLINK23850Utils.BOOTSTRAP_SERVERS)
>                         .option("value.format", "csv")
>                         .option("sink.delivery-guarantee", "exactly-once")
>                         .option("sink.transactional-id-prefix", "flink-23850")
>                         .option("scan.startup.mode", "earliest-offset")
>                         .build());
>         resultTable.executeInsert("T4");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to