This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new d18a700  [FLINK-20500][upsert-kafka] Fix unstable temporal join test
d18a700 is described below

commit d18a700c98b3b90737d3490ae2711cd72d62a95d
Author: Shengkai <[email protected]>
AuthorDate: Wed Jan 20 20:03:51 2021 +0800

    [FLINK-20500][upsert-kafka] Fix unstable temporal join test
    
    This closes #14701
---
 .../kafka/table/UpsertKafkaTableITCase.java        | 39 ++++++++++++----------
 1 file changed, 21 insertions(+), 18 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
index cbf5ff2..fc486a8 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
@@ -497,40 +497,43 @@ public class UpsertKafkaTableITCase extends 
KafkaTestBaseWithFlink {
                                 LocalDateTime.parse("2020-08-16T00:01:05")),
                         changelogRow(
                                 "+U",
-                                103L,
-                                "Richard",
-                                "London",
-                                LocalDateTime.parse("2020-08-16T01:01:05")),
-                        changelogRow(
-                                "+U",
                                 101L,
                                 "Alice",
                                 "Wuhan",
                                 LocalDateTime.parse("2020-08-16T00:02:00")),
                         changelogRow(
                                 "+U",
-                                101L,
-                                "Alice",
-                                "Hangzhou",
-                                LocalDateTime.parse("2020-08-16T01:04:05")),
-                        changelogRow(
-                                "+U",
                                 104L,
                                 "Tomato",
                                 "Hongkong",
                                 LocalDateTime.parse("2020-08-16T00:05:05")),
                         changelogRow(
                                 "+U",
-                                104L,
-                                "Tomato",
-                                "Shenzhen",
-                                LocalDateTime.parse("2020-08-16T01:05:05")),
-                        changelogRow(
-                                "+U",
                                 105L,
                                 "Tim",
                                 "Shenzhen",
                                 LocalDateTime.parse("2020-08-16T00:06:00")),
+                        // Keep the timestamp in the records are in the 
ascending order.
+                        // It will keep the records in the kafka partition are 
in the order.
+                        // It has the same effects by adjusting the watermark 
strategy.
+                        changelogRow(
+                                "+U",
+                                103L,
+                                "Richard",
+                                "London",
+                                LocalDateTime.parse("2020-08-16T01:01:05")),
+                        changelogRow(
+                                "+U",
+                                101L,
+                                "Alice",
+                                "Hangzhou",
+                                LocalDateTime.parse("2020-08-16T01:04:05")),
+                        changelogRow(
+                                "+U",
+                                104L,
+                                "Tomato",
+                                "Shenzhen",
+                                LocalDateTime.parse("2020-08-16T01:05:05")),
                         changelogRow(
                                 "+U",
                                 105L,

Reply via email to