[
https://issues.apache.org/jira/browse/FLINK-36674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
slankka updated FLINK-36674:
----------------------------
Summary: Flink will not commit to kafka if checkpointing is not enabled
(was: Flink will not commit if checkpointing is not enabled)
> Flink will not commit to kafka if checkpointing is not enabled
> --------------------------------------------------------------
>
> Key: FLINK-36674
> URL: https://issues.apache.org/jira/browse/FLINK-36674
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: slankka
> Priority: Major
>
> background: recently I found that _Flink-sql-connector-kafka-1.13.5_ kafka
> source DDL style, checkpoint is *NOT* enabled, I still can see consumer
> group defined by _properties.group.id_ values in Kafka. And when I change to
> flink-sql-connector-kafka-1.17.2, checkpoint must enabled otherwise consumer
> group will never seem. Developers offen look into consumer groups to see
> whether program works properly.
> The expected result will be Flink document, saying:
> {noformat}
> If checkpointing is not enabled, Kafka source relies on Kafka consumer’s
> internal automatic periodic offset committing logic, configured by
> enable.auto.commit and auto.commit.interval.ms in the properties of Kafka
> consumer.{noformat}
>
> The simplist kafka source DDL:
>
> {code:java}
> CREATE TABLE KafkaTable (
> `user_id` BIGINT,
> `item_id` BIGINT,
> `behavior` STRING,
> `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json'
> ) {code}
>
> Precondition: _group.id_ always provided.
>
> Here're my questions:
> # why the _enable.auto.commit_ behavior turned off by default (The Kafka
> officially says defaults to true if there group.id provided) which may cause
> user use must define expilicity true to the kafka source DDL. (naturely
> peolple won't set it)
> # It seems that this ticket [FLINK-20114] Fix a few KafkaSource-related bugs
> - ASF JIRA takes effects earlier than 1.13.5, what causes the commiting to
> consumer group behaivor diffierence between Flink 1.13.5 and Flink 1.17.2.
> (no checkpoint enabled)?
>
> logic related:
> [Github PR-15161|https://github.com/apache/flink/pull/15161]
>
> {code:java}
> org.apache.flink.connector.kafka.source.KafkaSourceBuilder:
> maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false);
> {code}
> FlinkKafkaConsumer.java respect the default value (which is true) ,
>
> {code:java}
> @Override
> protected boolean getIsAutoCommitEnabled() {
> return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> true)
> && PropertiesUtil.getLong(
> properties,
> ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
> > 0;
> } {code}
>
>
> The OffsetCommitModes :
>
> {code:java}
> public class OffsetCommitModes {
> ...
> public static OffsetCommitMode fromConfiguration(
> boolean enableAutoCommit,
> boolean enableCommitOnCheckpoint,
> boolean enableCheckpointing) {
> if (enableCheckpointing) {
> // if checkpointing is enabled, the mode depends only on whether
> committing on
> // checkpoints is enabled
> return (enableCommitOnCheckpoint)
> ? OffsetCommitMode.ON_CHECKPOINTS
> : OffsetCommitMode.DISABLED;
> } else {
> // else, the mode depends only on whether auto committing is
> enabled in the provided
> // Kafka properties
> return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC :
> OffsetCommitMode.DISABLED;
> }
> }
> } {code}
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)