This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new d18a700 [FLINK-20500][upsert-kafka] Fix unstable temporal join test
d18a700 is described below
commit d18a700c98b3b90737d3490ae2711cd72d62a95d
Author: Shengkai <[email protected]>
AuthorDate: Wed Jan 20 20:03:51 2021 +0800
[FLINK-20500][upsert-kafka] Fix unstable temporal join test
This closes #14701
---
.../kafka/table/UpsertKafkaTableITCase.java | 39 ++++++++++++----------
1 file changed, 21 insertions(+), 18 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
index cbf5ff2..fc486a8 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
@@ -497,40 +497,43 @@ public class UpsertKafkaTableITCase extends
KafkaTestBaseWithFlink {
LocalDateTime.parse("2020-08-16T00:01:05")),
changelogRow(
"+U",
- 103L,
- "Richard",
- "London",
- LocalDateTime.parse("2020-08-16T01:01:05")),
- changelogRow(
- "+U",
101L,
"Alice",
"Wuhan",
LocalDateTime.parse("2020-08-16T00:02:00")),
changelogRow(
"+U",
- 101L,
- "Alice",
- "Hangzhou",
- LocalDateTime.parse("2020-08-16T01:04:05")),
- changelogRow(
- "+U",
104L,
"Tomato",
"Hongkong",
LocalDateTime.parse("2020-08-16T00:05:05")),
changelogRow(
"+U",
- 104L,
- "Tomato",
- "Shenzhen",
- LocalDateTime.parse("2020-08-16T01:05:05")),
- changelogRow(
- "+U",
105L,
"Tim",
"Shenzhen",
LocalDateTime.parse("2020-08-16T00:06:00")),
+ // Keep the timestamp in the records are in the
ascending order.
+ // It will keep the records in the kafka partition are
in the order.
+ // It has the same effects by adjusting the watermark
strategy.
+ changelogRow(
+ "+U",
+ 103L,
+ "Richard",
+ "London",
+ LocalDateTime.parse("2020-08-16T01:01:05")),
+ changelogRow(
+ "+U",
+ 101L,
+ "Alice",
+ "Hangzhou",
+ LocalDateTime.parse("2020-08-16T01:04:05")),
+ changelogRow(
+ "+U",
+ 104L,
+ "Tomato",
+ "Shenzhen",
+ LocalDateTime.parse("2020-08-16T01:05:05")),
changelogRow(
"+U",
105L,