This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new beb08a2 [FLINK-20429][kafka] Using proper watermark interval in
KafkaTableITCase#testKafkaTemporalJoinChangelog test
beb08a2 is described below
commit beb08a29d5eeb1ee075132597f89270d524db152
Author: Leonard Xu <[email protected]>
AuthorDate: Mon Dec 7 20:33:28 2020 +0800
[FLINK-20429][kafka] Using proper watermark interval in
KafkaTableITCase#testKafkaTemporalJoinChangelog test
This closes #14267
---
.../flink/streaming/connectors/kafka/table/KafkaTableITCase.java | 7 ++++---
.../flink-connector-kafka/src/test/resources/product_changelog.txt | 3 ++-
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index dc0b17b..2aa56ed 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -551,7 +551,7 @@ public class KafkaTableITCase extends
KafkaTestBaseWithFlink {
" order_time TIMESTAMP(3),\n" +
" quantity INT,\n" +
" purchaser STRING,\n" +
- " WATERMARK FOR order_time AS
order_time\n" +
+ " WATERMARK FOR order_time AS
order_time - INTERVAL '1' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '%s',\n" +
@@ -572,7 +572,8 @@ public class KafkaTableITCase extends
KafkaTestBaseWithFlink {
+ "('o_003', 'p_001', TIMESTAMP '2020-10-01
12:00:00', 2, 'Tom'),"
+ "('o_004', 'p_002', TIMESTAMP '2020-10-01
12:00:00', 2, 'King'),"
+ "('o_005', 'p_001', TIMESTAMP '2020-10-01
18:00:00', 10, 'Leonard'),"
- + "('o_006', 'p_002', TIMESTAMP '2020-10-01
18:00:00', 10, 'Leonard')";
+ + "('o_006', 'p_002', TIMESTAMP '2020-10-01
18:00:00', 10, 'Leonard'),"
+ + "('o_007', 'p_002', TIMESTAMP '2020-10-01
18:00:01', 10, 'Robinson')"; // used to advance watermark
tEnv.executeSql(orderInitialValues).await();
// create product table and set initial values
@@ -583,7 +584,7 @@ public class KafkaTableITCase extends
KafkaTestBaseWithFlink {
" product_price DECIMAL(10,
4),\n" +
" update_time TIMESTAMP(3)
METADATA FROM 'value.source.timestamp' VIRTUAL,\n" +
" PRIMARY KEY(product_id) NOT
ENFORCED,\n" +
- " WATERMARK FOR update_time AS
update_time\n" +
+ " WATERMARK FOR update_time AS
update_time - INTERVAL '1' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '%s',\n" +
diff --git
a/flink-connectors/flink-connector-kafka/src/test/resources/product_changelog.txt
b/flink-connectors/flink-connector-kafka/src/test/resources/product_changelog.txt
index cf3e727..530c4de 100644
---
a/flink-connectors/flink-connector-kafka/src/test/resources/product_changelog.txt
+++
b/flink-connectors/flink-connector-kafka/src/test/resources/product_changelog.txt
@@ -3,4 +3,5 @@
{"before":{"product_id":"p_001","product_name":"scooter","product_price":11.11},"after":{"product_id":"p_001","product_name":"scooter","product_price":12.99},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1601553600000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1601553603456,"transaction":null}
{"before":{"product_id":"p_002","product_name":"basketball","product_price":23.11},"after":{"product_id":"p_002","product_name":"basketball","product_price":19.99},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1601553600000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1601553603456,"transaction":null}
{"before":{"product_id":"p_001","product_name":"scooter","product_price":12.99},"after":{"product_id":"p_001","product_name":"scooter","product_price":11.99},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1601575200000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1601575203456,"transaction":null}
-{"before":{"product_id":"p_002","product_name":"basketball","product_price":19.99},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1601575200000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1601575203456,"transaction":null}
\ No newline at end of file
+{"before":{"product_id":"p_002","product_name":"basketball","product_price":19.99},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1601575200000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1601575203456,"transaction":null}
+{"before":{"product_id":"p_001","product_name":"scooter","product_price":11.99},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1601576200000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1601575203456,"transaction":null}
\ No newline at end of file