This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fb671b77 Revert "The toreactstream is converted to the changlogstream
method. Support update, delete message. (#1895)" (#1972)
fb671b77 is described below
commit fb671b77f39c0f58601a90cae77b04cc37cd216d
Author: Kirs <[email protected]>
AuthorDate: Mon May 30 09:54:50 2022 +0800
Revert "The toreactstream is converted to the changlogstream method.
Support update, delete message. (#1895)" (#1972)
This reverts commit 8882f041ca52bcbc57fd5f383797807d5fc728e8.
---
.../org/apache/seatunnel/flink/stream/FlinkStreamExecution.java | 2 +-
.../src/main/java/org/apache/seatunnel/flink/util/TableUtil.java | 6 +++++-
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index 3514bae4..43fa7daa 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -99,7 +99,7 @@ public class FlinkStreamExecution implements
Execution<FlinkStreamSource, FlinkS
if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
StreamTableEnvironment tableEnvironment =
flinkEnvironment.getStreamTableEnvironment();
Table table =
tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME));
- return
Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table,
false));
+ return
Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
}
return Optional.empty();
}
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
index c1c856e5..e1877eff 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java
@@ -39,7 +39,11 @@ public final class TableUtil {
if (isAppend) {
return tableEnvironment.toAppendStream(table, typeInfo);
}
- return tableEnvironment.toChangelogStream(table);
+ return tableEnvironment
+ .toRetractStream(table, typeInfo)
+ .filter(row -> row.f0)
+ .map(row -> row.f1)
+ .returns(typeInfo);
}
public static DataSet<Row> tableToDataSet(BatchTableEnvironment
tableEnvironment, Table table) {