[
https://issues.apache.org/jira/browse/FLINK-36674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
slankka updated FLINK-36674:
----------------------------
Description:
background: recently I found that _Flink-sql-connector-kafka-1.13.5_ kafka
source DDL style, checkpoint is *NOT* enabled, I still can see consumer group
defined by _properties.group.id_ values in Kafka. And when I change to
flink-sql-connector-kafka-1.17.2, checkpoint must enabled otherwise consumer
group will never seem. Developers offen look into consumer groups to see
whether program works properly.
The expected result will be Flink document, saying:
{noformat}
If checkpointing is not enabled, Kafka source relies on Kafka consumer’s
internal automatic periodic offset committing logic, configured by
enable.auto.commit and auto.commit.interval.ms in the properties of Kafka
consumer.{noformat}
The simplist kafka source DDL:
{code:java}
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
) {code}
Precondition: _group.id_ always provided.
Here're my research result:
1. Flink will not commit offset since the _enable.auto.commit_ behavior turned
off by default (FLINK-20114)
why the _enable.auto.commit_ behavior turned off by default (The Kafka
officially says defaults to true if there group.id provided) which may cause
user use must define expilicity true to the kafka source DDL. (naturely peolple
won't set it)
# It seems that this ticket FLINK-20114 fix a few KafkaSource-related bugs and
takes effects earlier than 1.13.5, what causes the commiting to consumer group
behaivor diffierence between Flink 1.13.5 and Flink 1.17.2. (no checkpoint
enabled)?
logic related:
[Github PR-15161|https://github.com/apache/flink/pull/15161]
{code:java}
org.apache.flink.connector.kafka.source.KafkaSourceBuilder:
maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); {code}
FlinkKafkaConsumer.java respect the default value (which is true) ,
{code:java}
@Override
protected boolean getIsAutoCommitEnabled() {
return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true)
&& PropertiesUtil.getLong(
properties,
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
> 0;
} {code}
The OffsetCommitModes :
{code:java}
public class OffsetCommitModes {
...
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {
if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether
committing on
// checkpoints is enabled
return (enableCommitOnCheckpoint)
? OffsetCommitMode.ON_CHECKPOINTS
: OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is
enabled in the provided
// Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC :
OffsetCommitMode.DISABLED;
}
}
} {code}
was:
background: recently I found that _Flink-sql-connector-kafka-1.13.5_ kafka
source DDL style, checkpoint is *NOT* enabled, I still can see consumer group
defined by _properties.group.id_ values in Kafka. And when I change to
flink-sql-connector-kafka-1.17.2, checkpoint must enabled otherwise consumer
group will never seem. Developers offen look into consumer groups to see
whether program works properly.
The expected result will be Flink document, saying:
{noformat}
If checkpointing is not enabled, Kafka source relies on Kafka consumer’s
internal automatic periodic offset committing logic, configured by
enable.auto.commit and auto.commit.interval.ms in the properties of Kafka
consumer.{noformat}
The simplist kafka source DDL:
{code:java}
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
) {code}
Precondition: _group.id_ always provided.
Here're my questions:
# why the _enable.auto.commit_ behavior turned off by default (The Kafka
officially says defaults to true if there group.id provided) which may cause
user use must define expilicity true to the kafka source DDL. (naturely peolple
won't set it)
# It seems that this ticket FLINK-20114 fix a few KafkaSource-related bugs and
takes effects earlier than 1.13.5, what causes the commiting to consumer group
behaivor diffierence between Flink 1.13.5 and Flink 1.17.2. (no checkpoint
enabled)?
logic related:
[Github PR-15161|https://github.com/apache/flink/pull/15161]
{code:java}
org.apache.flink.connector.kafka.source.KafkaSourceBuilder:
maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); {code}
FlinkKafkaConsumer.java respect the default value (which is true) ,
{code:java}
@Override
protected boolean getIsAutoCommitEnabled() {
return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true)
&& PropertiesUtil.getLong(
properties,
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
> 0;
} {code}
The OffsetCommitModes :
{code:java}
public class OffsetCommitModes {
...
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {
if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether
committing on
// checkpoints is enabled
return (enableCommitOnCheckpoint)
? OffsetCommitMode.ON_CHECKPOINTS
: OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is
enabled in the provided
// Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC :
OffsetCommitMode.DISABLED;
}
}
} {code}
> Flink will not commit to kafka if checkpointing is not enabled
> --------------------------------------------------------------
>
> Key: FLINK-36674
> URL: https://issues.apache.org/jira/browse/FLINK-36674
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.15.0, 1.16.0, 1.17.2, 1.18.1, 1.20.0, 1.19.1
> Reporter: slankka
> Priority: Major
>
> background: recently I found that _Flink-sql-connector-kafka-1.13.5_ kafka
> source DDL style, checkpoint is *NOT* enabled, I still can see consumer
> group defined by _properties.group.id_ values in Kafka. And when I change to
> flink-sql-connector-kafka-1.17.2, checkpoint must enabled otherwise consumer
> group will never seem. Developers offen look into consumer groups to see
> whether program works properly.
> The expected result will be Flink document, saying:
> {noformat}
> If checkpointing is not enabled, Kafka source relies on Kafka consumer’s
> internal automatic periodic offset committing logic, configured by
> enable.auto.commit and auto.commit.interval.ms in the properties of Kafka
> consumer.{noformat}
>
> The simplist kafka source DDL:
>
> {code:java}
> CREATE TABLE KafkaTable (
> `user_id` BIGINT,
> `item_id` BIGINT,
> `behavior` STRING,
> `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json'
> ) {code}
>
> Precondition: _group.id_ always provided.
>
> Here're my research result:
> 1. Flink will not commit offset since the _enable.auto.commit_ behavior
> turned off by default (FLINK-20114)
> why the _enable.auto.commit_ behavior turned off by default (The Kafka
> officially says defaults to true if there group.id provided) which may cause
> user use must define expilicity true to the kafka source DDL. (naturely
> peolple won't set it)
>
>
> # It seems that this ticket FLINK-20114 fix a few KafkaSource-related bugs
> and takes effects earlier than 1.13.5, what causes the commiting to consumer
> group behaivor diffierence between Flink 1.13.5 and Flink 1.17.2. (no
> checkpoint enabled)?
>
> logic related:
> [Github PR-15161|https://github.com/apache/flink/pull/15161]
>
> {code:java}
> org.apache.flink.connector.kafka.source.KafkaSourceBuilder:
> maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false);
> {code}
> FlinkKafkaConsumer.java respect the default value (which is true) ,
>
> {code:java}
> @Override
> protected boolean getIsAutoCommitEnabled() {
> return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> true)
> && PropertiesUtil.getLong(
> properties,
> ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
> > 0;
> } {code}
>
> The OffsetCommitModes :
> {code:java}
> public class OffsetCommitModes {
> ...
> public static OffsetCommitMode fromConfiguration(
> boolean enableAutoCommit,
> boolean enableCommitOnCheckpoint,
> boolean enableCheckpointing) {
> if (enableCheckpointing) {
> // if checkpointing is enabled, the mode depends only on whether
> committing on
> // checkpoints is enabled
> return (enableCommitOnCheckpoint)
> ? OffsetCommitMode.ON_CHECKPOINTS
> : OffsetCommitMode.DISABLED;
> } else {
> // else, the mode depends only on whether auto committing is
> enabled in the provided
> // Kafka properties
> return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC :
> OffsetCommitMode.DISABLED;
> }
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)