[
https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
elon_X closed FLINK-35506.
--------------------------
Resolution: Not A Problem
> disable kafka auto-commit and rely on flink’s checkpointing if both are
> enabled
> -------------------------------------------------------------------------------
>
> Key: FLINK-35506
> URL: https://issues.apache.org/jira/browse/FLINK-35506
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.16.1
> Reporter: elon_X
> Priority: Major
> Attachments: image-2024-06-03-23-39-28-270.png
>
>
> When I use KafkaSource for consuming topics and set the Kafka parameter
> {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the
> task, I notice that both will commit offsets. Should Kafka's auto-commit be
> disabled when enabling Flink checkpointing, similar to how it's done with
> FlinkKafkaConsumer?
>
> *How to reproduce*
>
> {code:java}
> // code placeholder
> Properties kafkaParams = new Properties();
> kafkaParams.put("enable.auto.commit", "true");
> kafkaParams.put("auto.offset.reset", "latest");
> kafkaParams.put("fetch.min.bytes", "4096");
> kafkaParams.put("sasl.mechanism", "PLAIN");
> kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
> kafkaParams.put("bootstrap.servers", bootStrap);
> kafkaParams.put("group.id", expoGroupId);
> kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
> required username=\"" + username + "\" password=\"" + password + "\";");
> KafkaSource<String> source = KafkaSource
> .<String>builder()
> .setBootstrapServers(bootStrap)
> .setProperties(kafkaParams)
> .setGroupId(expoGroupId)
> .setTopics(Arrays.asList(expoTopic))
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.latest())
> .build();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
> .filter(r -> true);
> env.enableCheckpointing(3000 * 1000);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(1000 * 300);
> env.execute("kafka-consumer"); {code}
>
>
> the kafka client's
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator continuously
> committing offsets.
> !image-2024-06-03-23-39-28-270.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)