This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 9ffb7963e90 [FLINK-28454][kafka][docs] Fix the wrong timestamp unit of 
KafkaSource
9ffb7963e90 is described below

commit 9ffb7963e90b7bdcfbedaf8ef9ede4e0d7089fe8
Author: fanrui <[email protected]>
AuthorDate: Fri Jul 8 13:09:22 2022 +0800

    [FLINK-28454][kafka][docs] Fix the wrong timestamp unit of KafkaSource
---
 docs/content.zh/docs/connectors/datastream/kafka.md                   | 4 ++--
 docs/content/docs/connectors/datastream/kafka.md                      | 4 ++--
 .../kafka/source/enumerator/initializer/OffsetsInitializer.java       | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/kafka.md 
b/docs/content.zh/docs/connectors/datastream/kafka.md
index 2819e48fe4f..389b0f809bc 100644
--- a/docs/content.zh/docs/connectors/datastream/kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/kafka.md
@@ -114,8 +114,8 @@ KafkaSource.builder()
     .setStartingOffsets(OffsetsInitializer.committedOffsets())
     // 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
     
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
-    // 从时间戳大于等于指定时间的数据开始消费
-    .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
+    // 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
+    .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
     // 从最早位点开始消费
     .setStartingOffsets(OffsetsInitializer.earliest())
     // 从最末尾位点开始消费
diff --git a/docs/content/docs/connectors/datastream/kafka.md 
b/docs/content/docs/connectors/datastream/kafka.md
index fc908de035f..c03815c83c2 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -116,8 +116,8 @@ KafkaSource.builder()
     .setStartingOffsets(OffsetsInitializer.committedOffsets())
     // Start from committed offset, also use EARLIEST as reset strategy if 
committed offset doesn't exist
     
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
-    // Start from the first record whose timestamp is greater than or equals a 
timestamp
-    .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
+    // Start from the first record whose timestamp is greater than or equals a 
timestamp (milliseconds)
+    .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
     // Start from earliest offset
     .setStartingOffsets(OffsetsInitializer.earliest())
     // Start from latest offset
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
index 6de272da4b3..5a1bac5cf8d 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
@@ -125,9 +125,9 @@ public interface OffsetsInitializer extends Serializable {
     /**
      * Get an {@link OffsetsInitializer} which initializes the offsets in each 
partition so that the
      * initialized offset is the offset of the first record whose record 
timestamp is greater than
-     * or equals the give timestamp.
+     * or equals the give timestamp (milliseconds).
      *
-     * @param timestamp the timestamp to start the consumption.
+     * @param timestamp the timestamp (milliseconds) to start the consumption.
      * @return an {@link OffsetsInitializer} which initializes the offsets 
based on the given
      *     timestamp.
      * @see KafkaAdminClient#listOffsets(Map)

Reply via email to