Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5056#discussion_r152763735
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
    @@ -309,6 +356,51 @@ public B withKafkaTimestampAsRowtimeAttribute(
                        return builder();
                }
     
    +           /**
    +            * Configures the TableSource to start reading from the 
earliest offset for all partitions.
    +            *
    +            * @see FlinkKafkaConsumerBase#setStartFromEarliest()
    +            */
    +           public B startReadingFromEarliest() {
    +                   this.startupMode = StartupMode.EARLIEST;
    +                   this.specificStartupOffsets = null;
    +                   return builder();
    +           }
    +
    +           /**
    +            * Configures the TableSource to start reading from the latest 
offset for all partitions.
    +            *
    +            * @see FlinkKafkaConsumerBase#setStartFromLatest()
    +            */
    +           public B startReadingFromLatest() {
    +                   this.startupMode = StartupMode.LATEST;
    +                   this.specificStartupOffsets = null;
    +                   return builder();
    +           }
    +
    +           /**
    +            * Configures the TableSource to start reading from any 
committed group offsets found in Zookeeper / Kafka brokers.
    +            *
    +            * @see FlinkKafkaConsumerBase#setStartFromGroupOffsets()
    +            */
    +           public B startReadingFromGroupOffsets() {
    +                   this.startupMode = StartupMode.GROUP_OFFSETS;
    +                   this.specificStartupOffsets = null;
    +                   return builder();
    +           }
    +
    +           /**
    +            * Configures the TableSource to start reading partitions from 
specific offsets, set independently for each partition.
    +            *
    +            * @param specificStartupOffsets the specified offsets for 
partitions
    +            * @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
    +            */
    +           public B 
startReadingFromSpecificOffsets(Map<KafkaTopicPartition, Long> 
specificStartupOffsets) {
    --- End diff --
    
    shorten to `fromSpecificOffsets`?


---

Reply via email to