[
https://issues.apache.org/jira/browse/FLINK-24851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Arseniy Tashoyan updated FLINK-24851:
-------------------------------------
Description:
Creating KafkaSource like this:
{code:scala}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "group1")
props.put("auto.offset.reset", "latest")
val kafkaSource = KafkaSource.builder[String]()
.setProperties(props)
.build()
{code}
The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of
configured *"latest"*.
This occurs because _"auto.offset.reset"_ gets overridden by
_startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_.
The default value for _startingOffsetsInitializer_ is _"earliest"_.
This behavior is misleading.
This behavior imposes an inconvenience on configuring the Kafka connector. We
cannot use the Kafka setting _"auto.offset.reset"_ as-is. Instead we must
extract this particular setting from other settings and propagate to
_KafkaSourceBuilder.setStartingOffsets()_:
{code:scala}
val kafkaSource = KafkaSource.builder[String]()
.setProperties(props)
.setStartingOffsets(
OffsetsInitializer.committedOffsets(
OffsetResetStrategy.valueOf(
props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
.asInstanceOf[String]
.toUpperCase(Locale.ROOT)
)
)
)
.build()
{code}
The expected behavior is to use the value of _"auto.offset.reset"_ provided by
_KafkaSourceBuilder.setProperties()_ - unless overridden via
_KafkaSourceBuilder. setStartingOffsets()_.
was:
Creating KafkaSource like this:
{code:scala}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "group1")
props.put("auto.offset.reset", "latest")
val kafkaSource = KafkaSource.builder[String]()
.setProperties(props)
.build()
{code}
The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of
configured *"latest"*.
This occurs because _"auto.offset.reset"_ gets overridden by
_startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_.
The default value for _startingOffsetsInitializer_ is _"earliest"_.
This behavior is misleading.
This behavior imposes an inconvenience on configuring the Kafka connector. We
cannot use the Kafka settings _"auto.offset.reset"_ as-is. Instead we must
extract this particular setting from other settings and propagate to
_KafkaSourceBuilder.setStartingOffsets()_:
{code:scala}
val kafkaSource = KafkaSource.builder[String]()
.setProperties(props)
.setStartingOffsets(
OffsetsInitializer.committedOffsets(
OffsetResetStrategy.valueOf(
props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
.asInstanceOf[String]
.toUpperCase(Locale.ROOT)
)
)
)
.build()
{code}
The expected behavior is to use the value of _"auto.offset.reset"_ provided by
_KafkaSourceBuilder.setProperties()_ - unless overridden via
_KafkaSourceBuilder. setStartingOffsets()_.
> KafkaSourceBuilder: auto.offset.reset is ignored
> ------------------------------------------------
>
> Key: FLINK-24851
> URL: https://issues.apache.org/jira/browse/FLINK-24851
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.14.0
> Reporter: Arseniy Tashoyan
> Priority: Major
>
> Creating KafkaSource like this:
> {code:scala}
> val props = new Properties()
> props.put("bootstrap.servers", "localhost:9092")
> props.put("group.id", "group1")
> props.put("auto.offset.reset", "latest")
> val kafkaSource = KafkaSource.builder[String]()
> .setProperties(props)
> .build()
> {code}
> The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of
> configured *"latest"*.
> This occurs because _"auto.offset.reset"_ gets overridden by
> _startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_.
> The default value for _startingOffsetsInitializer_ is _"earliest"_.
> This behavior is misleading.
> This behavior imposes an inconvenience on configuring the Kafka connector. We
> cannot use the Kafka setting _"auto.offset.reset"_ as-is. Instead we must
> extract this particular setting from other settings and propagate to
> _KafkaSourceBuilder.setStartingOffsets()_:
> {code:scala}
> val kafkaSource = KafkaSource.builder[String]()
> .setProperties(props)
> .setStartingOffsets(
> OffsetsInitializer.committedOffsets(
> OffsetResetStrategy.valueOf(
> props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
> .asInstanceOf[String]
> .toUpperCase(Locale.ROOT)
> )
> )
> )
> .build()
> {code}
> The expected behavior is to use the value of _"auto.offset.reset"_ provided
> by _KafkaSourceBuilder.setProperties()_ - unless overridden via
> _KafkaSourceBuilder. setStartingOffsets()_.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)