This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch revert-1895-dev in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
commit 685cc918a43f7dd67651beba8d4d5c969c5b5edb Author: Kirs <[email protected]> AuthorDate: Mon May 30 09:07:07 2022 +0800 Revert "The toreactstream is converted to the changlogstream method. Support update, delete message. (#1895)" This reverts commit 8882f041ca52bcbc57fd5f383797807d5fc728e8. --- .../org/apache/seatunnel/flink/stream/FlinkStreamExecution.java | 2 +- .../src/main/java/org/apache/seatunnel/flink/util/TableUtil.java | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java index 3514bae4..43fa7daa 100644 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java +++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java @@ -99,7 +99,7 @@ public class FlinkStreamExecution implements Execution<FlinkStreamSource, FlinkS if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) { StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment(); Table table = tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME)); - return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, false)); + return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true)); } return Optional.empty(); } diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java index c1c856e5..e1877eff 100644 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java +++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java @@ -39,7 +39,11 @@ public final class TableUtil { if (isAppend) { return tableEnvironment.toAppendStream(table, typeInfo); } - return tableEnvironment.toChangelogStream(table); + return tableEnvironment + .toRetractStream(table, typeInfo) + .filter(row -> row.f0) + .map(row -> row.f1) + .returns(typeInfo); } public static DataSet<Row> tableToDataSet(BatchTableEnvironment tableEnvironment, Table table) {
