Efrat19 commented on code in PR #267:
URL:
https://github.com/apache/flink-connector-kafka/pull/267#discussion_r3448103737
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java:
##########
@@ -382,7 +382,7 @@ public KafkaSourceBuilder<OUT>
setRackIdSupplier(SerializableSupplier<String> ra
* created.
*
* <ul>
- * <li><code>auto.offset.reset.strategy</code> is overridden by {@link
+ * <li><code>auto.offset.reset</code> is overridden by {@link
Review Comment:
ditto
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java:
##########
@@ -462,52 +462,57 @@ protected DynamicKafkaSource<RowData>
createDynamicKafkaSource(
.setDeserializer(kafkaDeserializer)
.setProperties(properties);
- switch (startupMode) {
- case EARLIEST:
-
dynamicKafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
+
dynamicKafkaSourceBuilder.setStartingOffsets(getStartingOffsetsInitializer());
Review Comment:
There's a change to default behavior we would probably want to document in
release notes - because the so-far ignored user set
`ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` will now take over the one inferred
by the `StartingOffsetsInitializer`
##########
docs/content.zh/docs/connectors/datastream/kafka.md:
##########
@@ -222,7 +226,7 @@ Kafka Source 支持流式和批式两种运行模式。默认情况下,KafkaSo
Kafka consumer 的配置可以参考 [Apache Kafka
文档](http://kafka.apache.org/documentation/#consumerconfigs)。
请注意,即使指定了以下配置项,构建器也会将其覆盖:
-- ```auto.offset.reset.strategy``` 被
OffsetsInitializer#getAutoOffsetResetStrategy() 覆盖
Review Comment:
Nit: Separate commit
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java:
##########
@@ -122,6 +124,31 @@ static OffsetsInitializer
committedOffsets(OffsetResetStrategy offsetResetStrate
KafkaPartitionSplit.COMMITTED_OFFSET, offsetResetStrategy);
}
+ /**
+ * Get an {@link OffsetsInitializer} which delegates offset initialization
to the given {@link
+ * OffsetsInitializer} and uses the given {@link OffsetResetStrategy} when
Kafka needs to reset
+ * an initialized starting offset.
+ *
+ * <p>The offset reset strategy is only used when the returned initializer
is used to initialize
+ * starting offsets. The initialized offsets themselves are unchanged, so
initializers such as
+ * {@link #earliest()} and {@link #latest()} keep their normal startup
behavior.
+ *
+ * @param offsetsInitializer the initializer which resolves the starting
offsets.
+ * @param offsetResetStrategy the offset reset strategy to use when the
initialized starting
+ * offsets are out of range.
+ * @return an {@link OffsetsInitializer} with the given offset reset
strategy.
+ */
+ static OffsetsInitializer withOffsetResetStrategy(
Review Comment:
Am I right to understand that:
1. In dynamic/ SQL mode, `properties.auto.offset.reset` is the effective
configuration
2. In DataStream mode, the same configuration
(`ConsumerConfig.AUTO_OFFSET_RESET_CONFIG`) is still ignored, and a manual
`OffsetResetStrategy` can only be specified by the mutually exclusive
`withOffsetResetStrategy`?
Can we use `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` in DataStream as well
instead?
##########
docs/content/docs/connectors/datastream/kafka.md:
##########
@@ -235,7 +239,7 @@ for more details.
Please note that the following keys will be overridden by the builder even if
it is configured:
-- ```auto.offset.reset.strategy``` is overridden by
```OffsetsInitializer#getAutoOffsetResetStrategy()```
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]