[ 
https://issues.apache.org/jira/browse/FLINK-20114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17896560#comment-17896560
 ] 

slankka commented on FLINK-20114:
---------------------------------

[~becket_qin]  may I ask two questions?

background: recently I found that _Flink-sql-connector-kafka-1.13.5_ kafka 
source DDL style, checkpoint is *NOT* enabled,  I still can see consumer group 
defined by _properties.group.id_ values in Kafka. And when I change to 
flink-sql-connector-kafka-1.17.2, checkpoint must enabled otherwise consumer 
group will never seem. Developers offen look into consumer groups to see 
whether program works properly.

The expected result will be Flink document, saying: 
{noformat}
If checkpointing is not enabled, Kafka source relies on Kafka consumer’s 
internal automatic periodic offset committing logic, configured by 
enable.auto.commit and auto.commit.interval.ms in the properties of Kafka 
consumer.{noformat}
 

The simplist kafka source DDL:

 
{code:java}
CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
) {code}
 

Precondition: _group.id_ always provided.

 

Here're my questions:
 # why the _enable.auto.commit_ behavior turned off by default (The Kafka 
officially says defaults to true if there group.id provided) which may cause 
user use must define expilicity true to the kafka source DDL. (naturely peolple 
won't set it)
 # It seems that this ticket takes effects earlier than 1.13.5, what causes the 
commiting to consumer group behaivor diffierence between Flink 1.13.5 and Flink 
1.17.2. (no checkpoint enabled)?

 

logic related:

[Github PR-15161|https://github.com/apache/flink/pull/15161]

 
{code:java}
org.apache.flink.connector.kafka.source.KafkaSourceBuilder:

maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); {code}
FlinkKafkaConsumer.java respect the default value (which is true) , 

 
{code:java}
@Override
protected boolean getIsAutoCommitEnabled() {
    return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
true)
            && PropertiesUtil.getLong(
                            properties, 
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
                    > 0;
} {code}
 

 

The OffsetCommitModes : 

 
{code:java}
public class OffsetCommitModes {
...
    public static OffsetCommitMode fromConfiguration(
            boolean enableAutoCommit,
            boolean enableCommitOnCheckpoint,
            boolean enableCheckpointing) {

        if (enableCheckpointing) {
            // if checkpointing is enabled, the mode depends only on whether 
committing on
            // checkpoints is enabled
            return (enableCommitOnCheckpoint)
                    ? OffsetCommitMode.ON_CHECKPOINTS
                    : OffsetCommitMode.DISABLED;
        } else {
            // else, the mode depends only on whether auto committing is 
enabled in the provided
            // Kafka properties
            return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : 
OffsetCommitMode.DISABLED;
        }
    }
} {code}
 

 

 

 

 

> Fix a few KafkaSource-related bugs
> ----------------------------------
>
>                 Key: FLINK-20114
>                 URL: https://issues.apache.org/jira/browse/FLINK-20114
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.12.0
>            Reporter: Robert Metzger
>            Priority: Critical
>              Labels: pull-request-available, release-testing
>             Fix For: 1.13.0, 1.12.4
>
>         Attachments: Screenshot 2020-11-24 at 15.14.35.png, first-run.tgz, 
> second-run.tgz
>
>
> Feature introduced in https://issues.apache.org/jira/browse/FLINK-18323
> -------- 
> [General Information about the Flink 1.12 release 
> testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing]
> When testing a feature, consider the following aspects:
> - Is the documentation easy to understand
> - Are the error messages, log messages, APIs etc. easy to understand
> - Is the feature working as expected under normal conditions
> - Is the feature working / failing as expected with invalid input, induced 
> errors etc.
> If you find a problem during testing, please file a ticket 
> (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket.
> During the testing, and once you are finished, please write a short summary 
> of all things you have tested.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to