[ 
https://issues.apache.org/jira/browse/FLINK-36674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

slankka updated FLINK-36674:
----------------------------
    Summary: Flink will not commit to kafka if checkpointing is not enabled  
(was: Flink will not commit if checkpointing is not enabled)

> Flink will not commit to kafka if checkpointing is not enabled
> --------------------------------------------------------------
>
>                 Key: FLINK-36674
>                 URL: https://issues.apache.org/jira/browse/FLINK-36674
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: slankka
>            Priority: Major
>
> background: recently I found that _Flink-sql-connector-kafka-1.13.5_ kafka 
> source DDL style, checkpoint is *NOT* enabled,  I still can see consumer 
> group defined by _properties.group.id_ values in Kafka. And when I change to 
> flink-sql-connector-kafka-1.17.2, checkpoint must enabled otherwise consumer 
> group will never seem. Developers offen look into consumer groups to see 
> whether program works properly.
> The expected result will be Flink document, saying: 
> {noformat}
> If checkpointing is not enabled, Kafka source relies on Kafka consumer’s 
> internal automatic periodic offset committing logic, configured by 
> enable.auto.commit and auto.commit.interval.ms in the properties of Kafka 
> consumer.{noformat}
>  
> The simplist kafka source DDL:
>  
> {code:java}
> CREATE TABLE KafkaTable (
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING,
>   `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'json'
> ) {code}
>  
> Precondition: _group.id_ always provided.
>  
> Here're my questions:
>  # why the _enable.auto.commit_ behavior turned off by default (The Kafka 
> officially says defaults to true if there group.id provided) which may cause 
> user use must define expilicity true to the kafka source DDL. (naturely 
> peolple won't set it)
>  # It seems that this ticket [FLINK-20114] Fix a few KafkaSource-related bugs 
> - ASF JIRA takes effects earlier than 1.13.5, what causes the commiting to 
> consumer group behaivor diffierence between Flink 1.13.5 and Flink 1.17.2. 
> (no checkpoint enabled)?
>  
> logic related:
> [Github PR-15161|https://github.com/apache/flink/pull/15161]
>  
> {code:java}
> org.apache.flink.connector.kafka.source.KafkaSourceBuilder:
> maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); 
> {code}
> FlinkKafkaConsumer.java respect the default value (which is true) , 
>  
> {code:java}
> @Override
> protected boolean getIsAutoCommitEnabled() {
>     return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
> true)
>             && PropertiesUtil.getLong(
>                             properties, 
> ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
>                     > 0;
> } {code}
>  
>  
> The OffsetCommitModes : 
>  
> {code:java}
> public class OffsetCommitModes {
> ...
>     public static OffsetCommitMode fromConfiguration(
>             boolean enableAutoCommit,
>             boolean enableCommitOnCheckpoint,
>             boolean enableCheckpointing) {
>         if (enableCheckpointing) {
>             // if checkpointing is enabled, the mode depends only on whether 
> committing on
>             // checkpoints is enabled
>             return (enableCommitOnCheckpoint)
>                     ? OffsetCommitMode.ON_CHECKPOINTS
>                     : OffsetCommitMode.DISABLED;
>         } else {
>             // else, the mode depends only on whether auto committing is 
> enabled in the provided
>             // Kafka properties
>             return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : 
> OffsetCommitMode.DISABLED;
>         }
>     }
> } {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to