leonardBang commented on a change in pull request #14160:
URL: https://github.com/apache/flink/pull/14160#discussion_r530101454
##########
File path: docs/dev/table/connectors/formats/canal.zh.md
##########
@@ -219,6 +219,17 @@ Format 参数
</tbody>
</table>
+注意事项
+----------------
+
+### 重复的变更事件
+
+在正常的操作环境下,Canal 应用能以 **exactly-once** 的语义投递每条变更事件。在这种情况下,Flink 消费 Canal
产生的变更事件能够工作地很好。
Review comment:
```suggestion
在正常的操作环境下,Canal 应用能以 **exactly-once** 的语义投递每条变更事件。在这种情况下,Flink 消费 Canal
产生的变更事件能够工作得很好。
```
##########
File path: docs/dev/table/connectors/formats/debezium.md
##########
@@ -282,6 +282,14 @@ Use format `debezium-avro-confluent` to interpret Debezium
Avro messages and for
Caveats
----------------
+### Duplicate change events
+
+Under normal operating scenarios, the Debezium application delivers every
change event **exactly-once**. This works pretty well when Flink consumes
Debezium produced events in this situation.
Review comment:
```suggestion
Under normal operating scenarios, the Debezium application delivers every
change event **exactly-once**. Flink works pretty well when consumes Debezium
produced events in this situation.
```
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java
##########
@@ -267,6 +302,20 @@ private static void validateScanSourceForStreaming(
scanSource.getClass().getName()
)
);
+ } else if (!changelogMode.containsOnly(RowKind.INSERT)) {
Review comment:
The condition`!changelogMode.containsOnly(RowKind.INSERT)` does not
indicate the source is a `CDC source which is non-upsert`
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -455,15 +455,12 @@ class TemporalJoinITCase(state: StateBackendMode)
" ON o.currency = r.currency"
tEnv.executeSql(sql).await()
- val rawResult = getRawResults("rowtime_default_sink")
val expected = List(
- "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01)",
- "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00:02)",
- "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04)",
- "+I(4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01)",
- "-U(2,US Dollar,1,2020-08-16T00:03,106,2020-08-16T00:02)",
- "+U(2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)")
- assertEquals(expected.sorted, rawResult.sorted)
+ "1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01",
+ "3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04",
+ "4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01",
+ "2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02")
Review comment:
```suggestion
"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01",
"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02",
"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04",
"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01")
```
minor: we can give a more readable order that is same as the left table
input.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]