Efrat19 commented on code in PR #267:
URL: 
https://github.com/apache/flink-connector-kafka/pull/267#discussion_r3448103737


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java:
##########
@@ -382,7 +382,7 @@ public KafkaSourceBuilder<OUT> 
setRackIdSupplier(SerializableSupplier<String> ra
      * created.
      *
      * <ul>
-     *   <li><code>auto.offset.reset.strategy</code> is overridden by {@link
+     *   <li><code>auto.offset.reset</code> is overridden by {@link

Review Comment:
   ditto



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java:
##########
@@ -462,52 +462,57 @@ protected DynamicKafkaSource<RowData> 
createDynamicKafkaSource(
                 .setDeserializer(kafkaDeserializer)
                 .setProperties(properties);
 
-        switch (startupMode) {
-            case EARLIEST:
-                
dynamicKafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
+        
dynamicKafkaSourceBuilder.setStartingOffsets(getStartingOffsetsInitializer());

Review Comment:
   There's a change to default behavior we would probably want to document in 
release notes - because the so-far ignored user set 
`ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` will now take over the one inferred 
by the `StartingOffsetsInitializer`



##########
docs/content.zh/docs/connectors/datastream/kafka.md:
##########
@@ -222,7 +226,7 @@ Kafka Source 支持流式和批式两种运行模式。默认情况下,KafkaSo
 Kafka consumer 的配置可以参考 [Apache Kafka 
文档](http://kafka.apache.org/documentation/#consumerconfigs)。
 
 请注意,即使指定了以下配置项,构建器也会将其覆盖:
-- ```auto.offset.reset.strategy``` 被 
OffsetsInitializer#getAutoOffsetResetStrategy() 覆盖

Review Comment:
   Nit: Separate commit



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java:
##########
@@ -122,6 +124,31 @@ static OffsetsInitializer 
committedOffsets(OffsetResetStrategy offsetResetStrate
                 KafkaPartitionSplit.COMMITTED_OFFSET, offsetResetStrategy);
     }
 
+    /**
+     * Get an {@link OffsetsInitializer} which delegates offset initialization 
to the given {@link
+     * OffsetsInitializer} and uses the given {@link OffsetResetStrategy} when 
Kafka needs to reset
+     * an initialized starting offset.
+     *
+     * <p>The offset reset strategy is only used when the returned initializer 
is used to initialize
+     * starting offsets. The initialized offsets themselves are unchanged, so 
initializers such as
+     * {@link #earliest()} and {@link #latest()} keep their normal startup 
behavior.
+     *
+     * @param offsetsInitializer the initializer which resolves the starting 
offsets.
+     * @param offsetResetStrategy the offset reset strategy to use when the 
initialized starting
+     *     offsets are out of range.
+     * @return an {@link OffsetsInitializer} with the given offset reset 
strategy.
+     */
+    static OffsetsInitializer withOffsetResetStrategy(

Review Comment:
   Am I right to understand that:
   1. In dynamic/ SQL mode, `properties.auto.offset.reset` is the effective 
configuration 
   2. In DataStream mode, the same configuration 
(`ConsumerConfig.AUTO_OFFSET_RESET_CONFIG`) is still ignored, and a manual 
`OffsetResetStrategy` can only be specified by the mutually exclusive 
`withOffsetResetStrategy`?
   
   Can we use `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` in DataStream as well 
instead?



##########
docs/content/docs/connectors/datastream/kafka.md:
##########
@@ -235,7 +239,7 @@ for more details.
 
 Please note that the following keys will be overridden by the builder even if
 it is configured:
-- ```auto.offset.reset.strategy``` is overridden by 
```OffsetsInitializer#getAutoOffsetResetStrategy()```

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to