mzz created FLINK-18575:
---------------------------
Summary: Failed to send data to Kafka
Key: FLINK-18575
URL: https://issues.apache.org/jira/browse/FLINK-18575
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.10.0
Reporter: mzz
Flink version: 1.10.0
Kafka version: 2.2
*code:*
{code:java}
private def producerKafka(aggs_result: DataStream[String], topic: String,
parallelism: Int) = {
val kafkaPro = new Properties()
kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS)
kafkaPro.setProperty("zookeeper.connect", SINK_ZK)
kafkaPro.setProperty("request.timeout.ms", "10000")
kafkaPro.setProperty("compression.type", "snappy")
kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "")
// 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试:
kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5")
val kafka = new FlinkKafkaProducer[String](topic, new
ResultDtSerialization(topic), kafkaPro,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
aggs_result.addSink(kafka).setParallelism(parallelism)
}
{code}
*when i use this code to produce to kafka ,its report a Error :
*{code:java}
2020-07-13 10:25:47,624 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during
disposal of stream operator.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send
data to Kafka: Pending record count must be zero at this point: 1
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Pending record count must be zero
at this point: 1
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
... 8 more
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)