[
https://issues.apache.org/jira/browse/FLINK-23854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-23854:
-----------------------------------
Labels: pull-request-available release-testing (was: release-testing)
> KafkaSink error when restart from the checkpoint with a lower parallelism by
> exactly-once guarantee
> ---------------------------------------------------------------------------------------------------
>
> Key: FLINK-23854
> URL: https://issues.apache.org/jira/browse/FLINK-23854
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.14.0
> Reporter: Hang Ruan
> Assignee: Arvid Heise
> Priority: Blocker
> Labels: pull-request-available, release-testing
>
> The KafkaSink throws the exception when restarted with a lower parallelism
> and the exactly-once guarantee. The exception is like this.
> {noformat}
> java.lang.IllegalStateException: Internal error: It is expected that state
> from previous executions is distributed to the same subtask id.
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.recoverAndInitializeState(KafkaWriter.java:178)
>
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:130)
>
> at
> org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:99)
>
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkOperator.initializeState(SinkOperator.java:134)
>
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:690)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:666)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:785)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:638)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)
> at java.lang.Thread.run(Thread.java:748)
> Suppressed: java.lang.NullPointerException
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkOperator.close(SinkOperator.java:195)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
>
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1028)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1014)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:927)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:797)
> ... 4 more
> {noformat}
> I start the kafka cluster(kafka_2.13-2.8.0) and the flink cluster in my own
> mac. I change the parallelism from 4 to 2 and restart the job from some
> completed checkpoint. Then the error occurs.
> And the cli command and the code are as follows.
> {code:java}
> // cli command
> ./bin/flink run -d -c com.test.KafkaExactlyOnceScaleDownTest -s
> /Users/test/checkpointDir/ExactlyOnceTest1/67105fcc1724e147fc6208af0dd90618/chk-1
> /Users/test/project/self/target/test.jar
> {code}
> {code:java}
> public class KafkaExactlyOnceScaleDownTest {
> public static void main(String[] args) throws Exception {
> final String kafkaSourceTopic = "flinkSourceTest";
> final String kafkaSinkTopic = "flinkSinkExactlyTest1";
> final String groupId = "ExactlyOnceTest1";
> final String brokers = "localhost:9092";
> final String ckDir = "file:///Users/test/checkpointDir/" + groupId;
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(60000);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getCheckpointConfig().setCheckpointStorage(ckDir);
> env.setParallelism(4);
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setBootstrapServers(brokers)
> .setTopics(kafkaSourceTopic)
> .setGroupId(groupId)
> .setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> DataStream<String> flintstones = env.fromSource(source,
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> DataStream<String> adults = flintstones.filter(s -> s != null &&
> s.length() > 2);
> Properties props = new Properties();
> props.setProperty("transaction.timeout.ms", "900000");
> adults.sinkTo(KafkaSink.<String>builder()
> .setBootstrapServers(brokers)
> .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
> .setTransactionalIdPrefix("tp-test-")
> .setKafkaProducerConfig(props)
> .setRecordSerializer(new SelfSerializationSchema(kafkaSinkTopic, new
> SimpleStringSchema()))
> .build());
> env.execute("ScaleDownTest");
> }
> static class SelfSerializationSchema implements
> KafkaRecordSerializationSchema<String> { private final
> SerializationSchema<String> valueSerialization; private String topic;
> SelfSerializationSchema(String topic, SerializationSchema<String>
> valueSerialization){ this.valueSerialization = valueSerialization; this.topic
> = topic; } @Override public void
> open(SerializationSchema.InitializationContext context, KafkaSinkContext
> sinkContext) throws Exception {
> KafkaRecordSerializationSchema.super.open(context, sinkContext); } @Override
> public ProducerRecord<byte[], byte[]> serialize(String s, KafkaSinkContext
> kafkaSinkContext, Long aLong) { final byte[] valueSerialized =
> valueSerialization.serialize(s); return new ProducerRecord<>(topic,
> valueSerialized); } }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)