[ 
https://issues.apache.org/jira/browse/FLINK-24851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17443896#comment-17443896
 ] 

liwei li edited comment on FLINK-24851 at 11/15/21, 3:25 PM:
-------------------------------------------------------------

 
{noformat}
 if there's conflict between builder-expressed logic and properties, builder 
will win finally.{noformat}
Agree with the above.
But I have a question: should we preset the default value of 
'auto.offset.reset' in the builder and always overwrite the properties. When 
the user does not explicitly use 'setStartingOffsets' of the builder,  
shouldn't the values in properties be used?


was (Author: liliwei):
 
{noformat}
 if there's conflict between builder-expressed logic and properties, builder 
will win finally.{noformat}
Agree with the above.
But I have a question: should we preset the default value of 
'auto.offset.reset' in the builder and always overwrite the properties. When 
the user does not explicitly use 'setStartingOffsets' of the builder,  
shouldn't the values in properties be used?

 
 
 
 

> KafkaSourceBuilder: auto.offset.reset is ignored
> ------------------------------------------------
>
>                 Key: FLINK-24851
>                 URL: https://issues.apache.org/jira/browse/FLINK-24851
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>            Reporter: Arseniy Tashoyan
>            Assignee: liwei li
>            Priority: Major
>              Labels: pull-request-available
>
> Creating KafkaSource like this:
> {code:scala}
> val props = new Properties()
> props.put("bootstrap.servers", "localhost:9092")
> props.put("group.id", "group1")
> props.put("auto.offset.reset", "latest")
> val kafkaSource = KafkaSource.builder[String]()
>       .setProperties(props)
>       .build()
> {code}
> The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of 
> configured *"latest"*.
> This occurs because _"auto.offset.reset"_ gets overridden by 
> _startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_.
>  The default value for _startingOffsetsInitializer_ is _"earliest"_.
> This behavior is misleading.
> This behavior imposes an inconvenience on configuring the Kafka connector. We 
> cannot use the Kafka setting _"auto.offset.reset"_ as-is. Instead we must 
> extract this particular setting from other settings and propagate to 
> _KafkaSourceBuilder.setStartingOffsets()_:
> {code:scala}
> val kafkaSource = KafkaSource.builder[String]()
>       .setProperties(props)
>       .setStartingOffsets(
>         OffsetsInitializer.committedOffsets(
>           OffsetResetStrategy.valueOf(
>             props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>               .asInstanceOf[String]
>               .toUpperCase(Locale.ROOT)
>           )
>         )
>       )
>       .build()
> {code}
> The expected behavior is to use the value of _"auto.offset.reset"_ provided 
> by _KafkaSourceBuilder.setProperties()_ - unless overridden via 
> _KafkaSourceBuilder. setStartingOffsets()_.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to