This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch revert-1895-dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git

commit 685cc918a43f7dd67651beba8d4d5c969c5b5edb
Author: Kirs <[email protected]>
AuthorDate: Mon May 30 09:07:07 2022 +0800

    Revert "The toreactstream is converted to the changlogstream method. 
Support update, delete message. (#1895)"
    
    This reverts commit 8882f041ca52bcbc57fd5f383797807d5fc728e8.
---
 .../org/apache/seatunnel/flink/stream/FlinkStreamExecution.java     | 2 +-
 .../src/main/java/org/apache/seatunnel/flink/util/TableUtil.java    | 6 +++++-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index 3514bae4..43fa7daa 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -99,7 +99,7 @@ public class FlinkStreamExecution implements 
Execution<FlinkStreamSource, FlinkS
         if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
             StreamTableEnvironment tableEnvironment = 
flinkEnvironment.getStreamTableEnvironment();
             Table table = 
tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
-            return 
Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, 
false));
+            return 
Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
         }
         return Optional.empty();
     }
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
index c1c856e5..e1877eff 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
@@ -39,7 +39,11 @@ public final class TableUtil {
         if (isAppend) {
             return tableEnvironment.toAppendStream(table, typeInfo);
         }
-        return tableEnvironment.toChangelogStream(table);
+        return tableEnvironment
+                .toRetractStream(table, typeInfo)
+                .filter(row -> row.f0)
+                .map(row -> row.f1)
+                .returns(typeInfo);
     }
 
     public static DataSet<Row> tableToDataSet(BatchTableEnvironment 
tableEnvironment, Table table) {

Reply via email to