[ 
https://issues.apache.org/jira/browse/FLINK-24851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arseniy Tashoyan updated FLINK-24851:
-------------------------------------
    Description: 
Creating KafkaSource like this:


{code:scala}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "group1")
props.put("auto.offset.reset", "latest")
val kafkaSource = KafkaSource.builder[String]()
      .setProperties(props)
      .build()
{code}


The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of 
configured *"latest"*.
This occurs because _"auto.offset.reset"_ gets overridden by 
_startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_. 
The default value for _startingOffsetsInitializer_ is _"earliest"_.

This behavior is misleading.

This behavior imposes an inconvenience on configuring the Kafka connector. We 
cannot use the Kafka settings _"auto.offset.reset"_ as-is. Instead we must 
extract this particular setting from other settings and propagate to 
_KafkaSourceBuilder.setStartingOffsets()_:


{code:scala}
val kafkaSource = KafkaSource.builder[String]()
      .setProperties(props)
      .setStartingOffsets(
        OffsetsInitializer.committedOffsets(
          OffsetResetStrategy.valueOf(
            props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
              .asInstanceOf[String]
              .toUpperCase(Locale.ROOT)
          )
        )
      )
      .build()
{code}


The expected behavior is to use the value of _"auto.offset.reset"_ provided by 
_KafkaSourceBuilder.setProperties()_ - unless overridden via 
_KafkaSourceBuilder. setStartingOffsets()_.

  was:
Creating KafkaSource like this:


{code:scala}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "group1")
props.put("auto.offset.reset", "latest")
val kafkaSource = KafkaSource.builder[String]()
      .setProperties(props)
      .build()
{code}


The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of 
configured *"latest"*.
This occurs because _"auto.offset.reset"_ gets overridden by 
_startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_. 
The default value for startingOffsetsInitializer is _"earliest"_.

This behavior is misleading.

This behavior imposes an inconvenience on configuring the Kafka connector. We 
cannot use the Kafka settings _"auto.offset.reset"_ as-is. Instead we must 
extract this particular setting from other settings and propagate to 
_KafkaSourceBuilder.setStartingOffsets()_:


{code:scala}
val kafkaSource = KafkaSource.builder[String]()
      .setProperties(props)
      .setStartingOffsets(
        OffsetsInitializer.committedOffsets(
          OffsetResetStrategy.valueOf(
            props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
              .asInstanceOf[String]
              .toUpperCase(Locale.ROOT)
          )
        )
      )
      .build()
{code}


The expected behavior is to use the value of _"auto.offset.reset"_ provided by 
_KafkaSourceBuilder.setProperties()_ - unless overridden via 
_KafkaSourceBuilder. setStartingOffsets()_.


> KafkaSourceBuilder: auto.offset.reset is ignored
> ------------------------------------------------
>
>                 Key: FLINK-24851
>                 URL: https://issues.apache.org/jira/browse/FLINK-24851
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>            Reporter: Arseniy Tashoyan
>            Priority: Major
>
> Creating KafkaSource like this:
> {code:scala}
> val props = new Properties()
> props.put("bootstrap.servers", "localhost:9092")
> props.put("group.id", "group1")
> props.put("auto.offset.reset", "latest")
> val kafkaSource = KafkaSource.builder[String]()
>       .setProperties(props)
>       .build()
> {code}
> The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of 
> configured *"latest"*.
> This occurs because _"auto.offset.reset"_ gets overridden by 
> _startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_.
>  The default value for _startingOffsetsInitializer_ is _"earliest"_.
> This behavior is misleading.
> This behavior imposes an inconvenience on configuring the Kafka connector. We 
> cannot use the Kafka settings _"auto.offset.reset"_ as-is. Instead we must 
> extract this particular setting from other settings and propagate to 
> _KafkaSourceBuilder.setStartingOffsets()_:
> {code:scala}
> val kafkaSource = KafkaSource.builder[String]()
>       .setProperties(props)
>       .setStartingOffsets(
>         OffsetsInitializer.committedOffsets(
>           OffsetResetStrategy.valueOf(
>             props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>               .asInstanceOf[String]
>               .toUpperCase(Locale.ROOT)
>           )
>         )
>       )
>       .build()
> {code}
> The expected behavior is to use the value of _"auto.offset.reset"_ provided 
> by _KafkaSourceBuilder.setProperties()_ - unless overridden via 
> _KafkaSourceBuilder. setStartingOffsets()_.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to