[
https://issues.apache.org/jira/browse/FLINK-24151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias updated FLINK-24151:
-----------------------------
Description:
We experienced a {{RuntimeException}} in a test run for FLINK-23850 :
{code}
java.lang.RuntimeException: Failed to send data to Kafka: This exception is
raised by the broker if it could not locate the producer metadata associated
with the producerId in question. This could happen if, for instance, the
producer's records were deleted because their retention time had elapsed. Once
the last records of the producerId are removed, the producer's metadata is
removed from the broker, and future appends by the producer will return this
exception.
at
org.apache.flink.connector.kafka.sink.KafkaWriter.checkErroneous(KafkaWriter.java:263)
~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:178)
~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:161)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at StreamExecCalc$6.processElement(Unknown Source) ~[?:?]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36)
~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27)
~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:141)
~[flink-table_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:341)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:789)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:741)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
Caused by:
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownProducerIdException:
This exception is raised by the broker if it could not locate the producer
metadata associated with the producerId in question. This could happen if, for
instance, the producer's records were deleted because their retention time had
elapsed. Once the last records of the producerId are removed, the producer's
metadata is removed from the broker, and future appends by the producer will
return this exception.
{code}
Test job executed:
{code:java}
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 2000));
env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.setParallelism(6);
final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
tableEnv.createTable("T1",
TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("pk", DataTypes.STRING().notNull())
.column("x", DataTypes.STRING().notNull())
.build())
.option("topic", "flink-23850-in1")
.option("properties.bootstrap.servers",
FLINK23850Utils.BOOTSTRAP_SERVERS)
.option("value.format", "csv")
.option("scan.startup.mode", "earliest-offset")
.build());
final Table resultTable =
tableEnv.sqlQuery(
"SELECT "
+ "T1.pk, "
+ "'asd', "
+ "'foo', "
+ "'bar' "
+ "FROM T1");
tableEnv.createTable("T4",
TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("pk", DataTypes.STRING().notNull())
.column("some_calculated_value",
DataTypes.STRING())
.column("pk1", DataTypes.STRING())
.column("pk2", DataTypes.STRING())
.build())
.option("topic", "flink-23850-out")
.option("properties.bootstrap.servers",
FLINK23850Utils.BOOTSTRAP_SERVERS)
.option("value.format", "csv")
.option("sink.delivery-guarantee", "exactly-once")
.option("sink.transactional-id-prefix", "flink-23850")
.option("scan.startup.mode", "earliest-offset")
.build());
resultTable.executeInsert("T4");
{code}
was:
We experienced a {{RuntimeException}} in a test run for FLINK-23850 :
{code}
java.lang.RuntimeException: Failed to send data to Kafka: This exception is
raised by the broker if it could not locate the producer metadata associated
with the producerId in question. This could happen if, for instance, the
producer's records were deleted because their retention time had elapsed. Once
the last records of the producerId are removed, the producer's metadata is
removed from the broker, and future appends by the producer will return this
exception.
at
org.apache.flink.connector.kafka.sink.KafkaWriter.checkErroneous(KafkaWriter.java:263)
~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:178)
~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:161)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at StreamExecCalc$6.processElement(Unknown Source) ~[?:?]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36)
~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27)
~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:141)
~[flink-table_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:341)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:789)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:741)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
Caused by:
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownProducerIdException:
This exception is raised by the broker if it could not locate the producer
metadata associated with the producerId in question. This could happen if, for
instance, the producer's records were deleted because their retention time had
elapsed. Once the last records of the producerId are removed, the producer's
metadata is removed from the broker, and future appends by the producer will
return this exception.
{code}
Test job execute:
{code:java}
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 2000));
env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.setParallelism(6);
final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
tableEnv.createTable("T1",
TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("pk", DataTypes.STRING().notNull())
.column("x", DataTypes.STRING().notNull())
.build())
.option("topic", "flink-23850-in1")
.option("properties.bootstrap.servers",
FLINK23850Utils.BOOTSTRAP_SERVERS)
.option("value.format", "csv")
.option("scan.startup.mode", "earliest-offset")
.build());
final Table resultTable =
tableEnv.sqlQuery(
"SELECT "
+ "T1.pk, "
+ "'asd', "
+ "'foo', "
+ "'bar' "
+ "FROM T1");
tableEnv.createTable("T4",
TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("pk", DataTypes.STRING().notNull())
.column("some_calculated_value",
DataTypes.STRING())
.column("pk1", DataTypes.STRING())
.column("pk2", DataTypes.STRING())
.build())
.option("topic", "flink-23850-out")
.option("properties.bootstrap.servers",
FLINK23850Utils.BOOTSTRAP_SERVERS)
.option("value.format", "csv")
.option("sink.delivery-guarantee", "exactly-once")
.option("sink.transactional-id-prefix", "flink-23850")
.option("scan.startup.mode", "earliest-offset")
.build());
resultTable.executeInsert("T4");
{code}
> java.lang.RuntimeException occurs with setMaxConcurrentCheckpoints being
> enabled
> --------------------------------------------------------------------------------
>
> Key: FLINK-24151
> URL: https://issues.apache.org/jira/browse/FLINK-24151
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.14.0
> Reporter: Matthias
> Priority: Blocker
> Attachments: release-testing-run-6.tar.gz
>
>
> We experienced a {{RuntimeException}} in a test run for FLINK-23850 :
> {code}
> java.lang.RuntimeException: Failed to send data to Kafka: This exception is
> raised by the broker if it could not locate the producer metadata associated
> with the producerId in question. This could happen if, for instance, the
> producer's records were deleted because their retention time had elapsed.
> Once the last records of the producerId are removed, the producer's metadata
> is removed from the broker, and future appends by the producer will return
> this exception.
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.checkErroneous(KafkaWriter.java:263)
> ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:178)
> ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:161)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at StreamExecCalc$6.processElement(Unknown Source) ~[?:?]
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36)
> ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27)
> ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:141)
> ~[flink-table_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:341)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:789)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:741)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
> Caused by:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownProducerIdException:
> This exception is raised by the broker if it could not locate the producer
> metadata associated with the producerId in question. This could happen if,
> for instance, the producer's records were deleted because their retention
> time had elapsed. Once the last records of the producerId are removed, the
> producer's metadata is removed from the broker, and future appends by the
> producer will return this exception.
> {code}
> Test job executed:
> {code:java}
> Configuration config = new Configuration();
>
> config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
> true);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(config);
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 2000));
> env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
> env.setParallelism(6);
> final StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
> tableEnv.createTable("T1",
> TableDescriptor.forConnector("kafka")
> .schema(Schema.newBuilder()
> .column("pk", DataTypes.STRING().notNull())
> .column("x", DataTypes.STRING().notNull())
> .build())
> .option("topic", "flink-23850-in1")
> .option("properties.bootstrap.servers",
> FLINK23850Utils.BOOTSTRAP_SERVERS)
> .option("value.format", "csv")
> .option("scan.startup.mode", "earliest-offset")
> .build());
> final Table resultTable =
> tableEnv.sqlQuery(
> "SELECT "
> + "T1.pk, "
> + "'asd', "
> + "'foo', "
> + "'bar' "
> + "FROM T1");
> tableEnv.createTable("T4",
> TableDescriptor.forConnector("kafka")
> .schema(Schema.newBuilder()
> .column("pk", DataTypes.STRING().notNull())
> .column("some_calculated_value",
> DataTypes.STRING())
> .column("pk1", DataTypes.STRING())
> .column("pk2", DataTypes.STRING())
> .build())
> .option("topic", "flink-23850-out")
> .option("properties.bootstrap.servers",
> FLINK23850Utils.BOOTSTRAP_SERVERS)
> .option("value.format", "csv")
> .option("sink.delivery-guarantee", "exactly-once")
> .option("sink.transactional-id-prefix", "flink-23850")
> .option("scan.startup.mode", "earliest-offset")
> .build());
> resultTable.executeInsert("T4");
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)