This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 9ffb7963e90 [FLINK-28454][kafka][docs] Fix the wrong timestamp unit of
KafkaSource
9ffb7963e90 is described below
commit 9ffb7963e90b7bdcfbedaf8ef9ede4e0d7089fe8
Author: fanrui <[email protected]>
AuthorDate: Fri Jul 8 13:09:22 2022 +0800
[FLINK-28454][kafka][docs] Fix the wrong timestamp unit of KafkaSource
---
docs/content.zh/docs/connectors/datastream/kafka.md | 4 ++--
docs/content/docs/connectors/datastream/kafka.md | 4 ++--
.../kafka/source/enumerator/initializer/OffsetsInitializer.java | 4 ++--
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/kafka.md
b/docs/content.zh/docs/connectors/datastream/kafka.md
index 2819e48fe4f..389b0f809bc 100644
--- a/docs/content.zh/docs/connectors/datastream/kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/kafka.md
@@ -114,8 +114,8 @@ KafkaSource.builder()
.setStartingOffsets(OffsetsInitializer.committedOffsets())
// 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
- // 从时间戳大于等于指定时间的数据开始消费
- .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
+ // 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
+ .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
// 从最早位点开始消费
.setStartingOffsets(OffsetsInitializer.earliest())
// 从最末尾位点开始消费
diff --git a/docs/content/docs/connectors/datastream/kafka.md
b/docs/content/docs/connectors/datastream/kafka.md
index fc908de035f..c03815c83c2 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -116,8 +116,8 @@ KafkaSource.builder()
.setStartingOffsets(OffsetsInitializer.committedOffsets())
// Start from committed offset, also use EARLIEST as reset strategy if
committed offset doesn't exist
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
- // Start from the first record whose timestamp is greater than or equals a
timestamp
- .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
+ // Start from the first record whose timestamp is greater than or equals a
timestamp (milliseconds)
+ .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
// Start from earliest offset
.setStartingOffsets(OffsetsInitializer.earliest())
// Start from latest offset
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
index 6de272da4b3..5a1bac5cf8d 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
@@ -125,9 +125,9 @@ public interface OffsetsInitializer extends Serializable {
/**
* Get an {@link OffsetsInitializer} which initializes the offsets in each
partition so that the
* initialized offset is the offset of the first record whose record
timestamp is greater than
- * or equals the give timestamp.
+ * or equals the give timestamp (milliseconds).
*
- * @param timestamp the timestamp to start the consumption.
+ * @param timestamp the timestamp (milliseconds) to start the consumption.
* @return an {@link OffsetsInitializer} which initializes the offsets
based on the given
* timestamp.
* @see KafkaAdminClient#listOffsets(Map)