[ 
https://issues.apache.org/jira/browse/FLINK-36674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

slankka updated FLINK-36674:
----------------------------
    Description: 
background: recently I found that _Flink-sql-connector-kafka-1.13.5_ kafka 
source DDL style, checkpoint is *NOT* enabled,  I still can see consumer group 
defined by _properties.group.id_ values in Kafka. And when I change to 
flink-sql-connector-kafka-1.17.2, checkpoint must enabled otherwise consumer 
group will never seem. Developers offen look into consumer groups to see 
whether program works properly.

The expected result will be Flink document, saying: 
{noformat}
If checkpointing is not enabled, Kafka source relies on Kafka consumer’s 
internal automatic periodic offset committing logic, configured by 
enable.auto.commit and auto.commit.interval.ms in the properties of Kafka 
consumer.{noformat}
 

The simplist kafka source DDL:

 
{code:java}
CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
) {code}
 

Precondition: _group.id_ always provided.

 

Here're my question and my research result:

1. Flink will not commit offset since the  _enable.auto.commit_ behavior turned 
off by default (FLINK-20114)

why the _enable.auto.commit_ behavior turned off by default (The Kafka 
officially says defaults to true if there group.id provided) which may cause 
user use must define expilicity true to the kafka source DDL. (naturely peolple 
won't set it)

Answered by: [~becket_qin] 
{quote}The idea is to associate the offset commit with the Flink checkpoint to 
avoid the rewind in committed offset. So the Flink job will only commit an 
offset after a checkpoint has succeeded. Otherwise, what could happen is after 
a job failover, the committed offset may go back because the job resets to the 
last successful checkpoint. By default checkpoint is enabled in Flink. So by 
default the offsets will be committed to Kafka.

When checkpoint is disabled, to make sure of the same semantic of when 
checkpoint is enabled, by default no offset is committed unless user explicitly 
specifies to do so. But I can understand the argument that if checkpoint is 
disabled, default Kafka behavior should be honored. So both behavior has their 
own point. But from backwards compatibility perspective, I think keep the 
behavior the same as Flink{{{}KafkaConsumer{}}} makes sense.
{quote}
 

2. It seems that this ticket FLINK-20114 fix a few KafkaSource-related bugs and 
takes effects earlier than 1.13.5, what causes the commiting to consumer group 
behaivor diffierence between Flink 1.13.5 and Flink 1.17.2. (no checkpoint 
enabled)?

This changes seems make by [FLINK-25368] Use AdminClient to get offsets rather 
than KafkaConsumer

It removed *kafka Consumer* which replaced by `{*}AdminClient{*}`

 

So the conclusion is:

Since Flink 1.15.0, Flink will not 

 

logic related:

[Github PR-15161|https://github.com/apache/flink/pull/15161]

 
{code:java}
org.apache.flink.connector.kafka.source.KafkaSourceBuilder:

maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); {code}
FlinkKafkaConsumer.java respect the default value (which is true) , 

 
{code:java}
@Override
protected boolean getIsAutoCommitEnabled() {
    return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
true)
            && PropertiesUtil.getLong(
                            properties, 
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
                    > 0;
} {code}
 

The OffsetCommitModes : 
{code:java}
public class OffsetCommitModes {
...
    public static OffsetCommitMode fromConfiguration(
            boolean enableAutoCommit,
            boolean enableCommitOnCheckpoint,
            boolean enableCheckpointing) {

        if (enableCheckpointing) {
            // if checkpointing is enabled, the mode depends only on whether 
committing on
            // checkpoints is enabled
            return (enableCommitOnCheckpoint)
                    ? OffsetCommitMode.ON_CHECKPOINTS
                    : OffsetCommitMode.DISABLED;
        } else {
            // else, the mode depends only on whether auto committing is 
enabled in the provided
            // Kafka properties
            return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : 
OffsetCommitMode.DISABLED;
        }
    }
} {code}

  was:
background: recently I found that _Flink-sql-connector-kafka-1.13.5_ kafka 
source DDL style, checkpoint is *NOT* enabled,  I still can see consumer group 
defined by _properties.group.id_ values in Kafka. And when I change to 
flink-sql-connector-kafka-1.17.2, checkpoint must enabled otherwise consumer 
group will never seem. Developers offen look into consumer groups to see 
whether program works properly.

The expected result will be Flink document, saying: 
{noformat}
If checkpointing is not enabled, Kafka source relies on Kafka consumer’s 
internal automatic periodic offset committing logic, configured by 
enable.auto.commit and auto.commit.interval.ms in the properties of Kafka 
consumer.{noformat}
 

The simplist kafka source DDL:

 
{code:java}
CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
) {code}
 

Precondition: _group.id_ always provided.

 

Here're my research result:

1. Flink will not commit offset since the  _enable.auto.commit_ behavior turned 
off by default (FLINK-20114)

why the _enable.auto.commit_ behavior turned off by default (The Kafka 
officially says defaults to true if there group.id provided) which may cause 
user use must define expilicity true to the kafka source DDL. (naturely peolple 
won't set it)

 

 
 # It seems that this ticket FLINK-20114 fix a few KafkaSource-related bugs and 
takes effects earlier than 1.13.5, what causes the commiting to consumer group 
behaivor diffierence between Flink 1.13.5 and Flink 1.17.2. (no checkpoint 
enabled)?

 

logic related:

[Github PR-15161|https://github.com/apache/flink/pull/15161]

 
{code:java}
org.apache.flink.connector.kafka.source.KafkaSourceBuilder:

maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); {code}
FlinkKafkaConsumer.java respect the default value (which is true) , 

 
{code:java}
@Override
protected boolean getIsAutoCommitEnabled() {
    return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
true)
            && PropertiesUtil.getLong(
                            properties, 
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
                    > 0;
} {code}
 

The OffsetCommitModes : 
{code:java}
public class OffsetCommitModes {
...
    public static OffsetCommitMode fromConfiguration(
            boolean enableAutoCommit,
            boolean enableCommitOnCheckpoint,
            boolean enableCheckpointing) {

        if (enableCheckpointing) {
            // if checkpointing is enabled, the mode depends only on whether 
committing on
            // checkpoints is enabled
            return (enableCommitOnCheckpoint)
                    ? OffsetCommitMode.ON_CHECKPOINTS
                    : OffsetCommitMode.DISABLED;
        } else {
            // else, the mode depends only on whether auto committing is 
enabled in the provided
            // Kafka properties
            return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : 
OffsetCommitMode.DISABLED;
        }
    }
} {code}


> Flink will not commit to kafka if checkpointing is not enabled
> --------------------------------------------------------------
>
>                 Key: FLINK-36674
>                 URL: https://issues.apache.org/jira/browse/FLINK-36674
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.0, 1.16.0, 1.17.2, 1.18.1, 1.20.0, 1.19.1
>            Reporter: slankka
>            Priority: Major
>
> background: recently I found that _Flink-sql-connector-kafka-1.13.5_ kafka 
> source DDL style, checkpoint is *NOT* enabled,  I still can see consumer 
> group defined by _properties.group.id_ values in Kafka. And when I change to 
> flink-sql-connector-kafka-1.17.2, checkpoint must enabled otherwise consumer 
> group will never seem. Developers offen look into consumer groups to see 
> whether program works properly.
> The expected result will be Flink document, saying: 
> {noformat}
> If checkpointing is not enabled, Kafka source relies on Kafka consumer’s 
> internal automatic periodic offset committing logic, configured by 
> enable.auto.commit and auto.commit.interval.ms in the properties of Kafka 
> consumer.{noformat}
>  
> The simplist kafka source DDL:
>  
> {code:java}
> CREATE TABLE KafkaTable (
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING,
>   `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'json'
> ) {code}
>  
> Precondition: _group.id_ always provided.
>  
> Here're my question and my research result:
> 1. Flink will not commit offset since the  _enable.auto.commit_ behavior 
> turned off by default (FLINK-20114)
> why the _enable.auto.commit_ behavior turned off by default (The Kafka 
> officially says defaults to true if there group.id provided) which may cause 
> user use must define expilicity true to the kafka source DDL. (naturely 
> peolple won't set it)
> Answered by: [~becket_qin] 
> {quote}The idea is to associate the offset commit with the Flink checkpoint 
> to avoid the rewind in committed offset. So the Flink job will only commit an 
> offset after a checkpoint has succeeded. Otherwise, what could happen is 
> after a job failover, the committed offset may go back because the job resets 
> to the last successful checkpoint. By default checkpoint is enabled in Flink. 
> So by default the offsets will be committed to Kafka.
> When checkpoint is disabled, to make sure of the same semantic of when 
> checkpoint is enabled, by default no offset is committed unless user 
> explicitly specifies to do so. But I can understand the argument that if 
> checkpoint is disabled, default Kafka behavior should be honored. So both 
> behavior has their own point. But from backwards compatibility perspective, I 
> think keep the behavior the same as Flink{{{}KafkaConsumer{}}} makes sense.
> {quote}
>  
> 2. It seems that this ticket FLINK-20114 fix a few KafkaSource-related bugs 
> and takes effects earlier than 1.13.5, what causes the commiting to consumer 
> group behaivor diffierence between Flink 1.13.5 and Flink 1.17.2. (no 
> checkpoint enabled)?
> This changes seems make by [FLINK-25368] Use AdminClient to get offsets 
> rather than KafkaConsumer
> It removed *kafka Consumer* which replaced by `{*}AdminClient{*}`
>  
> So the conclusion is:
> Since Flink 1.15.0, Flink will not 
>  
> logic related:
> [Github PR-15161|https://github.com/apache/flink/pull/15161]
>  
> {code:java}
> org.apache.flink.connector.kafka.source.KafkaSourceBuilder:
> maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); 
> {code}
> FlinkKafkaConsumer.java respect the default value (which is true) , 
>  
> {code:java}
> @Override
> protected boolean getIsAutoCommitEnabled() {
>     return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
> true)
>             && PropertiesUtil.getLong(
>                             properties, 
> ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
>                     > 0;
> } {code}
>  
> The OffsetCommitModes : 
> {code:java}
> public class OffsetCommitModes {
> ...
>     public static OffsetCommitMode fromConfiguration(
>             boolean enableAutoCommit,
>             boolean enableCommitOnCheckpoint,
>             boolean enableCheckpointing) {
>         if (enableCheckpointing) {
>             // if checkpointing is enabled, the mode depends only on whether 
> committing on
>             // checkpoints is enabled
>             return (enableCommitOnCheckpoint)
>                     ? OffsetCommitMode.ON_CHECKPOINTS
>                     : OffsetCommitMode.DISABLED;
>         } else {
>             // else, the mode depends only on whether auto committing is 
> enabled in the provided
>             // Kafka properties
>             return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : 
> OffsetCommitMode.DISABLED;
>         }
>     }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to