This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 068c5e3e3e [Core][Flink] refactor flink proxy source/sink (#7355)
068c5e3e3e is described below

commit 068c5e3e3e8518e8cef2bd7599d85455d1adbaef
Author: Tyrantlucifer <[email protected]>
AuthorDate: Mon Aug 12 12:55:41 2024 +0800

    [Core][Flink] refactor flink proxy source/sink (#7355)
---
 .../seatunnel/api/table/type/SeaTunnelRow.java     |  15 ++
 .../core/starter/execution/PluginUtil.java         |   1 +
 .../core/starter/execution/SourceTableInfo.java    |   1 +
 .../flink/execution/FlinkRuntimeEnvironment.java   |  25 ---
 .../flink/execution/SinkExecuteProcessor.java      |  12 +-
 .../execution/AbstractFlinkRuntimeEnvironment.java |  18 --
 .../flink/execution/DataStreamTableInfo.java       |   8 +-
 .../FlinkAbstractPluginExecuteProcessor.java       |  22 +--
 .../flink/execution/FlinkRuntimeEnvironment.java   |  25 ---
 .../flink/execution/SinkExecuteProcessor.java      |  13 +-
 .../flink/execution/SourceExecuteProcessor.java    |   8 +-
 .../flink/execution/TransformExecuteProcessor.java |  53 ++----
 .../seatunnel/engine/e2e/UnifyEnvParameterIT.java  |  16 --
 .../seatunnel-flink-connector-v2-example/pom.xml   |   6 +
 .../main/resources/examples/fake_to_console.conf   |  12 +-
 .../plugin/discovery/AbstractPluginDiscovery.java  |   1 +
 .../flink/utils/TypeConverterUtilsTest.java        | 161 ----------------
 .../flink/serialization/FlinkRowConverter.java     | 154 ---------------
 .../translation/flink/sink/FlinkSinkWriter.java    |  16 +-
 .../flink/source/FlinkRowCollector.java            |  21 +--
 .../translation/flink/source/FlinkSource.java      |  17 +-
 .../flink/source/FlinkSourceReader.java            |  12 +-
 .../flink/utils/TypeConverterUtils.java            | 210 ---------------------
 23 files changed, 103 insertions(+), 724 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 95a36b796c..11388dbb6a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -50,7 +50,16 @@ public final class SeaTunnelRow implements Serializable {
         this.tableId = tableId;
     }
 
+    /**
+     * The method will be removed in the future, please use {@link 
#setKind(RowKind)} instanced of
+     * it.
+     */
+    @Deprecated
     public void setRowKind(RowKind kind) {
+        setKind(kind);
+    }
+
+    public void setKind(RowKind kind) {
         this.kind = kind;
     }
 
@@ -62,7 +71,13 @@ public final class SeaTunnelRow implements Serializable {
         return tableId;
     }
 
+    /** The method will be removed in the future, please use {@link 
#getKind()} instanced of it. */
+    @Deprecated
     public RowKind getRowKind() {
+        return getKind();
+    }
+
+    public RowKind getKind() {
         return this.kind;
     }
 
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index 166e581e2d..c47ea0b121 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -50,6 +50,7 @@ import static 
org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
 
 /** The util used for Spark/Flink to create to SeaTunnelSource etc. */
+@SuppressWarnings("rawtypes")
 public class PluginUtil {
 
     protected static final String ENGINE_TYPE = "seatunnel";
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
index 529b9b4207..43642f5735 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
@@ -27,6 +27,7 @@ import java.util.List;
 
 @Data
 @AllArgsConstructor
+@SuppressWarnings("rawtypes")
 public class SourceTableInfo {
 
     private SeaTunnelSource source;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index fcc25a6b9e..e3428c751e 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -21,13 +21,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
-import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -50,7 +43,6 @@ public class FlinkRuntimeEnvironment extends 
AbstractFlinkRuntimeEnvironment
     @Override
     public FlinkRuntimeEnvironment prepare() {
         createStreamEnvironment();
-        createStreamTableEnvironment();
         if (config.hasPath("job.name")) {
             jobName = config.getString("job.name");
         }
@@ -63,23 +55,6 @@ public class FlinkRuntimeEnvironment extends 
AbstractFlinkRuntimeEnvironment
         return this;
     }
 
-    private void createStreamTableEnvironment() {
-        EnvironmentSettings environmentSettings =
-                
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
-        tableEnvironment =
-                StreamTableEnvironment.create(getStreamExecutionEnvironment(), 
environmentSettings);
-        TableConfig config = tableEnvironment.getConfig();
-        if (EnvironmentUtil.hasPathAndWaring(this.config, 
ConfigKeyName.MAX_STATE_RETENTION_TIME)
-                && EnvironmentUtil.hasPathAndWaring(
-                        this.config, ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
-            long max = 
this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
-            long min = 
this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
-            config.setIdleStateRetentionTime(Time.seconds(min), 
Time.seconds(max));
-        }
-        // init flink table env config
-        EnvironmentUtil.initTableEnvironmentConfiguration(this.config, 
config.getConfiguration());
-    }
-
     public static FlinkRuntimeEnvironment getInstance(Config config) {
         if (INSTANCE == null) {
             synchronized (FlinkRuntimeEnvironment.class) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 6a272aadb2..51586beaf0 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -51,6 +51,7 @@ import java.util.stream.Collectors;
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class SinkExecuteProcessor
         extends FlinkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
 
@@ -107,12 +108,15 @@ public class SinkExecuteProcessor
                                         
sinkConfig.getString(PLUGIN_NAME.key())),
                                 sinkConfig);
                 sink.setJobContext(jobContext);
-                SeaTunnelRowType sourceType = 
stream.getCatalogTable().getSeaTunnelRowType();
+                // TODO support sink multi sink
+                SeaTunnelRowType sourceType =
+                        stream.getCatalogTables().get(0).getSeaTunnelRowType();
                 sink.setTypeInfo(sourceType);
             } else {
+                // TODO support sink multi sink
                 TableSinkFactoryContext context =
                         TableSinkFactoryContext.replacePlaceholderAndCreate(
-                                stream.getCatalogTable(),
+                                stream.getCatalogTables().get(0),
                                 ReadonlyConfig.fromConfig(sinkConfig),
                                 classLoader,
                                 ((TableSinkFactory) factory.get())
@@ -134,8 +138,8 @@ public class SinkExecuteProcessor
             }
             DataStreamSink<Row> dataStreamSink =
                     stream.getDataStream()
-                            .sinkTo(new FlinkSink<>(sink, 
stream.getCatalogTable()))
-                            .name(sink.getPluginName());
+                            .sinkTo(new FlinkSink<>(sink, 
stream.getCatalogTables().get(0)))
+                            .name(String.format("%s-Sink", 
sink.getPluginName()));
             if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 int parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
                 dataStreamSink.setParallelism(parallelism);
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
index d805c286f8..34d9184277 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
 import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
 import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
-import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.Configuration;
@@ -37,11 +36,8 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
 import org.apache.flink.util.TernaryBoolean;
 
 import lombok.extern.slf4j.Slf4j;
@@ -55,10 +51,8 @@ import java.util.stream.Collectors;
 @Slf4j
 public abstract class AbstractFlinkRuntimeEnvironment implements 
RuntimeEnvironment {
 
-    protected static final String RESULT_TABLE_NAME = "result_table_name";
     protected Config config;
     protected StreamExecutionEnvironment environment;
-    protected StreamTableEnvironment tableEnvironment;
     protected JobMode jobMode;
     protected String jobName = Constants.LOGO;
 
@@ -78,10 +72,6 @@ public abstract class AbstractFlinkRuntimeEnvironment 
implements RuntimeEnvironm
         return EnvironmentUtil.checkRestartStrategy(config);
     }
 
-    public StreamTableEnvironment getStreamTableEnvironment() {
-        return tableEnvironment;
-    }
-
     public StreamExecutionEnvironment getStreamExecutionEnvironment() {
         return environment;
     }
@@ -228,14 +218,6 @@ public abstract class AbstractFlinkRuntimeEnvironment 
implements RuntimeEnvironm
         }
     }
 
-    public void registerResultTable(Config config, DataStream<Row> dataStream, 
String name) {
-        StreamTableEnvironment tableEnvironment = 
this.getStreamTableEnvironment();
-        if (!TableUtil.tableExists(tableEnvironment, name)) {
-            tableEnvironment.createTemporaryView(
-                    name, tableEnvironment.fromChangelogStream(dataStream));
-        }
-    }
-
     public boolean isStreaming() {
         return JobMode.STREAMING.equals(jobMode);
     }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
index 7b158ee60b..a80a09b506 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
@@ -18,20 +18,22 @@
 package org.apache.seatunnel.core.starter.flink.execution;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
+import java.util.List;
+
 @Data
 @AllArgsConstructor
 public class DataStreamTableInfo {
 
-    private DataStream<Row> dataStream;
+    private DataStream<SeaTunnelRow> dataStream;
 
-    private CatalogTable catalogTable;
+    private List<CatalogTable> catalogTables;
 
     private String tableName;
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 565b7379bf..57956db56c 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -23,12 +23,6 @@ import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
 
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -36,8 +30,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.function.BiConsumer;
 
-import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
-
 public abstract class FlinkAbstractPluginExecuteProcessor<T>
         implements PluginExecuteProcessor<DataStreamTableInfo, 
FlinkRuntimeEnvironment> {
 
@@ -84,10 +76,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
     protected Optional<DataStreamTableInfo> fromSourceTable(
             Config pluginConfig, List<DataStreamTableInfo> 
upstreamDataStreams) {
         if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
-            StreamTableEnvironment tableEnvironment =
-                    flinkRuntimeEnvironment.getStreamTableEnvironment();
             String tableName = pluginConfig.getString(SOURCE_TABLE_NAME);
-            Table table = tableEnvironment.from(tableName);
             DataStreamTableInfo dataStreamTableInfo =
                     upstreamDataStreams.stream()
                             .filter(info -> 
tableName.equals(info.getTableName()))
@@ -99,20 +88,13 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
                                                             "table %s not 
found", tableName)));
             return Optional.of(
                     new DataStreamTableInfo(
-                            TableUtil.tableToDataStream(tableEnvironment, 
table),
-                            dataStreamTableInfo.getCatalogTable(),
+                            dataStreamTableInfo.getDataStream(),
+                            dataStreamTableInfo.getCatalogTables(),
                             tableName));
         }
         return Optional.empty();
     }
 
-    protected void registerResultTable(Config pluginConfig, DataStream<Row> 
dataStream) {
-        if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
-            String resultTable = 
pluginConfig.getString(RESULT_TABLE_NAME.key());
-            flinkRuntimeEnvironment.registerResultTable(pluginConfig, 
dataStream, resultTable);
-        }
-    }
-
     protected abstract List<T> initializePlugins(
             List<URL> jarPaths, List<? extends Config> pluginConfigs);
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index c1de8ff4f7..e3428c751e 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -21,13 +21,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
-import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -50,7 +43,6 @@ public class FlinkRuntimeEnvironment extends 
AbstractFlinkRuntimeEnvironment
     @Override
     public FlinkRuntimeEnvironment prepare() {
         createStreamEnvironment();
-        createStreamTableEnvironment();
         if (config.hasPath("job.name")) {
             jobName = config.getString("job.name");
         }
@@ -63,23 +55,6 @@ public class FlinkRuntimeEnvironment extends 
AbstractFlinkRuntimeEnvironment
         return this;
     }
 
-    private void createStreamTableEnvironment() {
-        EnvironmentSettings environmentSettings =
-                EnvironmentSettings.newInstance().inStreamingMode().build();
-        tableEnvironment =
-                StreamTableEnvironment.create(getStreamExecutionEnvironment(), 
environmentSettings);
-        TableConfig config = tableEnvironment.getConfig();
-        if (EnvironmentUtil.hasPathAndWaring(this.config, 
ConfigKeyName.MAX_STATE_RETENTION_TIME)
-                && EnvironmentUtil.hasPathAndWaring(
-                        this.config, ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
-            long max = 
this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
-            long min = 
this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
-            config.setIdleStateRetentionTime(Time.seconds(min), 
Time.seconds(max));
-        }
-        // init flink table env config
-        EnvironmentUtil.initTableEnvironmentConfiguration(this.config, 
config.getConfiguration());
-    }
-
     public static FlinkRuntimeEnvironment getInstance(Config config) {
         if (INSTANCE == null) {
             synchronized (FlinkRuntimeEnvironment.class) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 1424746455..c713593821 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -52,6 +52,7 @@ import java.util.stream.Collectors;
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
+@SuppressWarnings("unchecked,rawtypes")
 public class SinkExecuteProcessor
         extends FlinkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
 
@@ -108,12 +109,15 @@ public class SinkExecuteProcessor
                                         
sinkConfig.getString(PLUGIN_NAME.key())),
                                 sinkConfig);
                 sink.setJobContext(jobContext);
-                SeaTunnelRowType sourceType = 
stream.getCatalogTable().getSeaTunnelRowType();
+                // TODO sink support multi table
+                SeaTunnelRowType sourceType =
+                        stream.getCatalogTables().get(0).getSeaTunnelRowType();
                 sink.setTypeInfo(sourceType);
             } else {
+                // TODO sink support multi table
                 TableSinkFactoryContext context =
                         TableSinkFactoryContext.replacePlaceholderAndCreate(
-                                stream.getCatalogTable(),
+                                stream.getCatalogTables().get(0),
                                 ReadonlyConfig.fromConfig(sinkConfig),
                                 classLoader,
                                 ((TableSinkFactory) factory.get())
@@ -137,8 +141,9 @@ public class SinkExecuteProcessor
                     stream.getDataStream()
                             .sinkTo(
                                     SinkV1Adapter.wrap(
-                                            new FlinkSink<>(sink, 
stream.getCatalogTable())))
-                            .name(sink.getPluginName());
+                                            new FlinkSink<>(
+                                                    sink, 
stream.getCatalogTables().get(0))))
+                            .name(String.format("%s-Sink", 
sink.getPluginName()));
             if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 int parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
                 dataStreamSink.setParallelism(parallelism);
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 20b74f4b71..eeb757a853 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
@@ -71,21 +72,20 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
             Config pluginConfig = pluginConfigs.get(i);
             FlinkSource flinkSource = new FlinkSource<>(internalSource, 
envConfig);
 
-            DataStreamSource sourceStream =
+            DataStreamSource<SeaTunnelRow> sourceStream =
                     executionEnvironment.fromSource(
                             flinkSource,
                             WatermarkStrategy.noWatermarks(),
-                            String.format("%s-source", 
internalSource.getPluginName()));
+                            String.format("%s-Source", 
internalSource.getPluginName()));
 
             if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 int parallelism = 
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
                 sourceStream.setParallelism(parallelism);
             }
-            registerResultTable(pluginConfig, sourceStream);
             sources.add(
                     new DataStreamTableInfo(
                             sourceStream,
-                            sourceTableInfo.getCatalogTables().get(0),
+                            sourceTableInfo.getCatalogTables(),
                             pluginConfig.hasPath(RESULT_TABLE_NAME.key())
                                     ? 
pluginConfig.getString(RESULT_TABLE_NAME.key())
                                     : null));
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 1ff2cf6437..c92eaf42a9 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -25,19 +25,15 @@ import 
org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
+import org.apache.flink.streaming.api.operators.StreamMap;
 
 import java.net.URL;
 import java.util.Collections;
@@ -47,6 +43,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
 
+@SuppressWarnings("unchecked,rawtypes")
 public class TransformExecuteProcessor
         extends FlinkAbstractPluginExecuteProcessor<TableTransformFactory> {
 
@@ -97,21 +94,20 @@ public class TransformExecuteProcessor
                 TableTransformFactory factory = plugins.get(i);
                 TableTransformFactoryContext context =
                         new TableTransformFactoryContext(
-                                
Collections.singletonList(stream.getCatalogTable()),
+                                stream.getCatalogTables(),
                                 ReadonlyConfig.fromConfig(pluginConfig),
                                 classLoader);
                 
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
                 SeaTunnelTransform transform = 
factory.createTransform(context).createTransform();
 
-                SeaTunnelRowType sourceType = 
stream.getCatalogTable().getSeaTunnelRowType();
                 transform.setJobContext(jobContext);
-                DataStream<Row> inputStream =
-                        flinkTransform(sourceType, transform, 
stream.getDataStream());
-                registerResultTable(pluginConfig, inputStream);
+                DataStream<SeaTunnelRow> inputStream =
+                        flinkTransform(transform, stream.getDataStream());
+                // TODO transform support multi tables
                 upstreamDataStreams.add(
                         new DataStreamTableInfo(
                                 inputStream,
-                                transform.getProducedCatalogTable(),
+                                
Collections.singletonList(transform.getProducedCatalogTable()),
                                 pluginConfig.hasPath(RESULT_TABLE_NAME.key())
                                         ? 
pluginConfig.getString(RESULT_TABLE_NAME.key())
                                         : null));
@@ -126,28 +122,17 @@ public class TransformExecuteProcessor
         return upstreamDataStreams;
     }
 
-    protected DataStream<Row> flinkTransform(
-            SeaTunnelRowType sourceType, SeaTunnelTransform transform, 
DataStream<Row> stream) {
-        TypeInformation rowTypeInfo =
-                TypeConverterUtils.convert(
-                        
transform.getProducedCatalogTable().getSeaTunnelRowType());
-        FlinkRowConverter transformInputRowConverter = new 
FlinkRowConverter(sourceType);
-        FlinkRowConverter transformOutputRowConverter =
-                new 
FlinkRowConverter(transform.getProducedCatalogTable().getSeaTunnelRowType());
-        DataStream<Row> output =
-                stream.flatMap(
-                        (FlatMapFunction<Row, Row>)
-                                (value, out) -> {
-                                    SeaTunnelRow seaTunnelRow =
-                                            
transformInputRowConverter.reconvert(value);
-                                    SeaTunnelRow dataRow =
-                                            (SeaTunnelRow) 
transform.map(seaTunnelRow);
-                                    if (dataRow != null) {
-                                        Row copy = 
transformOutputRowConverter.convert(dataRow);
-                                        out.collect(copy);
-                                    }
-                                },
-                        rowTypeInfo);
-        return output;
+    protected DataStream<SeaTunnelRow> flinkTransform(
+            SeaTunnelTransform transform, DataStream<SeaTunnelRow> stream) {
+        return stream.transform(
+                String.format("%s-Transform", transform.getPluginName()),
+                TypeInformation.of(SeaTunnelRow.class),
+                new StreamMap<>(
+                        flinkRuntimeEnvironment
+                                .getStreamExecutionEnvironment()
+                                .clean(
+                                        row ->
+                                                
((SeaTunnelTransform<SeaTunnelRow>) transform)
+                                                        .map(row))));
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java
index ad5c6365f2..48485eb69c 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java
@@ -126,22 +126,6 @@ public class UnifyEnvParameterIT extends TestSuiteBase {
                             }
                             Assertions.assertNotNull(jobInfoReference.get());
                         });
-        Map<String, Object> jobInfo = jobInfoReference.get();
-
-        /**
-         * 'table.exec.resource.default-parallelism' has a higher priority 
than 'parallelism', so
-         * one of these nodes must have a parallelism of 2.
-         */
-        Map<String, Object> plan = (Map<String, Object>) jobInfo.get("plan");
-        List<Map<String, Object>> nodes = (List<Map<String, Object>>) 
plan.get("nodes");
-        boolean tableExecParallelism = false;
-        for (Map<String, Object> node : nodes) {
-            int parallelism = (int) node.get("parallelism");
-            if (!tableExecParallelism && parallelism == 2) {
-                tableExecParallelism = true;
-            }
-        }
-        Assertions.assertTrue(tableExecParallelism);
     }
 
     public void genericTest(String configPath, AbstractTestFlinkContainer 
container)
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 99c75d324a..ef801bdb9c 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -112,6 +112,12 @@
             <version>${flink.1.15.3.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime-web</artifactId>
+            <version>${flink.1.15.3.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>com.squareup.okhttp3</groupId>
             <artifactId>mockwebserver</artifactId>
diff --git 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index 799dac7996..12c1f9f281 100644
--- 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -19,7 +19,7 @@
 ######
 
 env {
-  job.mode = "BATCH"
+  job.mode = "STREAMING"
   parallelism = 2
 }
 
@@ -41,15 +41,21 @@ source {
 }
 
 transform {
-
+  Copy {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    fields {
+      name1 = name
+    }
+  }
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
   # please go to https://seatunnel.apache.org/docs/category/transform-v2
 }
 
 sink {
   Console {
+    source_table_name = "fake1"
   }
-
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
   # please go to https://seatunnel.apache.org/docs/category/sink-v2
 }
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index d4bd43c3d1..4b62895f18 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -66,6 +66,7 @@ import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 @Slf4j
+@SuppressWarnings("unchecked")
 public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> 
{
 
     private static final String PLUGIN_MAPPING_FILE = 
"plugin-mapping.properties";
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
deleted file mode 100644
index 95cfa335e7..0000000000
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.utils;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TypeConverterUtilsTest {
-    // --------------------------------------------------------------
-    // basic types test
-    // --------------------------------------------------------------
-
-    @Test
-    public void convertStringType() {
-        Assertions.assertEquals(
-                BasicTypeInfo.STRING_TYPE_INFO, 
TypeConverterUtils.convert(BasicType.STRING_TYPE));
-    }
-
-    @Test
-    public void convertIntegerType() {
-        Assertions.assertEquals(
-                BasicTypeInfo.INT_TYPE_INFO, 
TypeConverterUtils.convert(BasicType.INT_TYPE));
-    }
-
-    @Test
-    public void convertBooleanType() {
-        Assertions.assertEquals(
-                BasicTypeInfo.BOOLEAN_TYPE_INFO,
-                TypeConverterUtils.convert(BasicType.BOOLEAN_TYPE));
-    }
-
-    @Test
-    public void convertDoubleType() {
-        Assertions.assertEquals(
-                BasicTypeInfo.DOUBLE_TYPE_INFO, 
TypeConverterUtils.convert(BasicType.DOUBLE_TYPE));
-    }
-
-    @Test
-    public void convertLongType() {
-        Assertions.assertEquals(
-                BasicTypeInfo.LONG_TYPE_INFO, 
TypeConverterUtils.convert(BasicType.LONG_TYPE));
-    }
-
-    @Test
-    public void convertFloatType() {
-        Assertions.assertEquals(
-                BasicTypeInfo.FLOAT_TYPE_INFO, 
TypeConverterUtils.convert(BasicType.FLOAT_TYPE));
-    }
-
-    @Test
-    public void convertByteType() {
-        Assertions.assertEquals(
-                BasicTypeInfo.BYTE_TYPE_INFO, 
TypeConverterUtils.convert(BasicType.BYTE_TYPE));
-    }
-
-    @Test
-    public void convertShortType() {
-        Assertions.assertEquals(
-                BasicTypeInfo.SHORT_TYPE_INFO, 
TypeConverterUtils.convert(BasicType.SHORT_TYPE));
-    }
-
-    @Test
-    public void convertBigDecimalType() {
-        /**
-         * To solve lost precision and scale of {@link
-         * org.apache.seatunnel.api.table.type.DecimalType}, use {@link
-         * 
org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the 
convert
-         * result of {@link org.apache.seatunnel.api.table.type.DecimalType} 
instance.
-         */
-        Assertions.assertEquals(
-                BasicTypeInfo.STRING_TYPE_INFO, TypeConverterUtils.convert(new 
DecimalType(30, 2)));
-    }
-
-    @Test
-    public void convertNullType() {
-        Assertions.assertEquals(
-                BasicTypeInfo.VOID_TYPE_INFO, 
TypeConverterUtils.convert(BasicType.VOID_TYPE));
-    }
-
-    // --------------------------------------------------------------
-    // array types test
-    // --------------------------------------------------------------
-
-    @Test
-    public void convertBooleanArrayType() {
-        Assertions.assertEquals(
-                BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO,
-                TypeConverterUtils.convert(ArrayType.BOOLEAN_ARRAY_TYPE));
-    }
-
-    @Test
-    public void convertStringArrayType() {
-        Assertions.assertEquals(
-                BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
-                TypeConverterUtils.convert(ArrayType.STRING_ARRAY_TYPE));
-    }
-
-    @Test
-    public void convertDoubleArrayType() {
-        Assertions.assertEquals(
-                BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO,
-                TypeConverterUtils.convert(ArrayType.DOUBLE_ARRAY_TYPE));
-    }
-
-    @Test
-    public void convertIntegerArrayType() {
-        Assertions.assertEquals(
-                BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO,
-                TypeConverterUtils.convert(ArrayType.INT_ARRAY_TYPE));
-    }
-
-    @Test
-    public void convertLongArrayType() {
-        Assertions.assertEquals(
-                BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO,
-                TypeConverterUtils.convert(ArrayType.LONG_ARRAY_TYPE));
-    }
-
-    @Test
-    public void convertFloatArrayType() {
-        Assertions.assertEquals(
-                BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO,
-                TypeConverterUtils.convert(ArrayType.FLOAT_ARRAY_TYPE));
-    }
-
-    @Test
-    public void convertByteArrayType() {
-        Assertions.assertEquals(
-                BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO,
-                TypeConverterUtils.convert(ArrayType.BYTE_ARRAY_TYPE));
-    }
-
-    @Test
-    public void convertShortArrayType() {
-        Assertions.assertEquals(
-                BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO,
-                TypeConverterUtils.convert(ArrayType.SHORT_ARRAY_TYPE));
-    }
-}
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
deleted file mode 100644
index b24cb96dfe..0000000000
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.table.type.SqlType;
-import org.apache.seatunnel.translation.serialization.RowConverter;
-
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BiFunction;
-
-/**
- * The row converter between {@link Row} and {@link SeaTunnelRow}, used to 
convert or reconvert
- * between flink row and seatunnel row
- */
-@Slf4j
-public class FlinkRowConverter extends RowConverter<Row> {
-
-    public FlinkRowConverter(SeaTunnelDataType<?> dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public Row convert(SeaTunnelRow seaTunnelRow) throws IOException {
-        validate(seaTunnelRow);
-        return (Row) convert(seaTunnelRow, dataType);
-    }
-
-    private static Object convert(Object field, SeaTunnelDataType<?> dataType) 
{
-        if (field == null) {
-            return null;
-        }
-        SqlType sqlType = dataType.getSqlType();
-        switch (sqlType) {
-            case ROW:
-                SeaTunnelRow seaTunnelRow = (SeaTunnelRow) field;
-                SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
-                int arity = rowType.getTotalFields();
-                Row engineRow = new Row(arity);
-                for (int i = 0; i < arity; i++) {
-                    engineRow.setField(
-                            i, convert(seaTunnelRow.getField(i), 
rowType.getFieldType(i)));
-                }
-                
engineRow.setKind(RowKind.fromByteValue(seaTunnelRow.getRowKind().toByteValue()));
-                return engineRow;
-            case MAP:
-                return convertMap(
-                        (Map<?, ?>) field, (MapType<?, ?>) dataType, 
FlinkRowConverter::convert);
-
-                /**
-                 * To solve lost precision and scale of {@link
-                 * org.apache.seatunnel.api.table.type.DecimalType}, use 
{@link java.lang.String} as
-                 * the convert result of {@link java.math.BigDecimal} instance.
-                 */
-            case DECIMAL:
-                BigDecimal decimal = (BigDecimal) field;
-                return decimal.toString();
-            default:
-                return field;
-        }
-    }
-
-    private static Object convertMap(
-            Map<?, ?> mapData,
-            MapType<?, ?> mapType,
-            BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
-        if (mapData == null || mapData.isEmpty()) {
-            return mapData;
-        }
-
-        Map<Object, Object> newMap = new HashMap<>(mapData.size());
-        mapData.forEach(
-                (key, value) -> {
-                    SeaTunnelDataType<?> keyType = mapType.getKeyType();
-                    SeaTunnelDataType<?> valueType = mapType.getValueType();
-                    newMap.put(
-                            convertFunction.apply(key, keyType),
-                            convertFunction.apply(value, valueType));
-                });
-        return newMap;
-    }
-
-    @Override
-    public SeaTunnelRow reconvert(Row engineRow) throws IOException {
-        return (SeaTunnelRow) reconvert(engineRow, dataType);
-    }
-
-    private static Object reconvert(Object field, SeaTunnelDataType<?> 
dataType) {
-        if (field == null) {
-            return null;
-        }
-        SqlType sqlType = dataType.getSqlType();
-        switch (sqlType) {
-            case ROW:
-                Row engineRow = (Row) field;
-                SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
-                int arity = rowType.getTotalFields();
-                SeaTunnelRow seaTunnelRow = new SeaTunnelRow(arity);
-                for (int i = 0; i < arity; i++) {
-                    seaTunnelRow.setField(
-                            i, reconvert(engineRow.getField(i), 
rowType.getFieldType(i)));
-                }
-                seaTunnelRow.setRowKind(
-                        
org.apache.seatunnel.api.table.type.RowKind.fromByteValue(
-                                engineRow.getKind().toByteValue()));
-                return seaTunnelRow;
-            case MAP:
-                return convertMap(
-                        (Map<?, ?>) field, (MapType<?, ?>) dataType, 
FlinkRowConverter::reconvert);
-
-                /**
-                 * To solve lost precision and scale of {@link
-                 * org.apache.seatunnel.api.table.type.DecimalType}, create 
{@link
-                 * java.math.BigDecimal} instance from {@link 
java.lang.String} type field.
-                 */
-            case DECIMAL:
-                DecimalType decimalType = (DecimalType) dataType;
-                String decimalData = (String) field;
-                BigDecimal decimal = new BigDecimal(decimalData);
-                decimal.setScale(decimalType.getScale(), RoundingMode.HALF_UP);
-                return decimal;
-            default:
-                return field;
-        }
-    }
-}
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 3c949f6480..725bf606f9 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -25,11 +25,9 @@ import 
org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SupportResourceShare;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
 
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.types.Row;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -54,7 +52,6 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
 
     private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, 
CommT, WriterStateT>
             sinkWriter;
-    private final FlinkRowConverter rowSerialization;
 
     private final Counter sinkWriteCount;
 
@@ -73,7 +70,6 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
             MetricsContext metricsContext) {
         this.sinkWriter = sinkWriter;
         this.checkpointId = checkpointId;
-        this.rowSerialization = new FlinkRowConverter(dataType);
         this.sinkWriteCount = 
metricsContext.counter(MetricNames.SINK_WRITE_COUNT);
         this.sinkWriteBytes = 
metricsContext.counter(MetricNames.SINK_WRITE_BYTES);
         this.sinkWriterQPS = metricsContext.meter(MetricNames.SINK_WRITE_QPS);
@@ -86,15 +82,17 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
 
     @Override
     public void write(InputT element, SinkWriter.Context context) throws 
IOException {
-        if (element instanceof Row) {
-            SeaTunnelRow seaTunnelRow = rowSerialization.reconvert((Row) 
element);
-            sinkWriter.write(seaTunnelRow);
+        if (element == null) {
+            return;
+        }
+        if (element instanceof SeaTunnelRow) {
+            sinkWriter.write((SeaTunnelRow) element);
             sinkWriteCount.inc();
-            sinkWriteBytes.inc(seaTunnelRow.getBytesSize());
+            sinkWriteBytes.inc(((SeaTunnelRow) element).getBytesSize());
             sinkWriterQPS.markEvent();
         } else {
             throw new InvalidClassException(
-                    "only support Flink Row at now, the element Class is " + 
element.getClass());
+                    "only support SeaTunnelRow at now, the element Class is " 
+ element.getClass());
         }
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
index 39b14d17d0..2ea584029e 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
@@ -25,26 +25,18 @@ import org.apache.seatunnel.api.common.metrics.MetricNames;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
 import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
 
 import org.apache.flink.api.connector.source.ReaderOutput;
-import org.apache.flink.types.Row;
 
 import lombok.extern.slf4j.Slf4j;
 
-/**
- * The implementation of {@link Collector} for flink engine, as a container 
for {@link SeaTunnelRow}
- * and convert {@link SeaTunnelRow} to {@link Row}.
- */
+/** The implementation of {@link Collector} for flink engine. */
 @Slf4j
 public class FlinkRowCollector implements Collector<SeaTunnelRow> {
 
-    private ReaderOutput<Row> readerOutput;
-
-    private final FlinkRowConverter rowSerialization;
+    private ReaderOutput<SeaTunnelRow> readerOutput;
 
     private final FlowControlGate flowControlGate;
 
@@ -54,9 +46,7 @@ public class FlinkRowCollector implements 
Collector<SeaTunnelRow> {
 
     private final Meter sourceReadQPS;
 
-    public FlinkRowCollector(
-            SeaTunnelRowType seaTunnelRowType, Config envConfig, 
MetricsContext metricsContext) {
-        this.rowSerialization = new FlinkRowConverter(seaTunnelRowType);
+    public FlinkRowCollector(Config envConfig, MetricsContext metricsContext) {
         this.flowControlGate = 
FlowControlGate.create(FlowControlStrategy.fromConfig(envConfig));
         this.sourceReadCount = 
metricsContext.counter(MetricNames.SOURCE_RECEIVED_COUNT);
         this.sourceReadBytes = 
metricsContext.counter(MetricNames.SOURCE_RECEIVED_BYTES);
@@ -67,8 +57,7 @@ public class FlinkRowCollector implements 
Collector<SeaTunnelRow> {
     public void collect(SeaTunnelRow record) {
         flowControlGate.audit(record);
         try {
-            Row row = rowSerialization.convert(record);
-            readerOutput.collect(row);
+            readerOutput.collect(record);
             sourceReadCount.inc();
             sourceReadBytes.inc(record.getBytesSize());
             sourceReadQPS.markEvent();
@@ -82,7 +71,7 @@ public class FlinkRowCollector implements 
Collector<SeaTunnelRow> {
         return this;
     }
 
-    public FlinkRowCollector withReaderOutput(ReaderOutput<Row> readerOutput) {
+    public FlinkRowCollector withReaderOutput(ReaderOutput<SeaTunnelRow> 
readerOutput) {
         this.readerOutput = readerOutput;
         return this;
     }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
index adf54eef4c..7868e6d3ef 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
@@ -24,9 +24,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -37,7 +35,6 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.types.Row;
 
 import java.io.Serializable;
 
@@ -48,7 +45,8 @@ import java.io.Serializable;
  * @param <EnumStateT> The generic type of enumerator state
  */
 public class FlinkSource<SplitT extends SourceSplit, EnumStateT extends 
Serializable>
-        implements Source<Row, SplitWrapper<SplitT>, EnumStateT>, 
ResultTypeQueryable<Row> {
+        implements Source<SeaTunnelRow, SplitWrapper<SplitT>, EnumStateT>,
+                ResultTypeQueryable<SeaTunnelRow> {
 
     private final SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source;
 
@@ -68,14 +66,13 @@ public class FlinkSource<SplitT extends SourceSplit, 
EnumStateT extends Serializ
     }
 
     @Override
-    public SourceReader<Row, SplitWrapper<SplitT>> 
createReader(SourceReaderContext readerContext)
-            throws Exception {
+    public SourceReader<SeaTunnelRow, SplitWrapper<SplitT>> createReader(
+            SourceReaderContext readerContext) throws Exception {
         org.apache.seatunnel.api.source.SourceReader.Context context =
                 new FlinkSourceReaderContext(readerContext, source);
         org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> 
reader =
                 source.createReader(context);
-        return new FlinkSourceReader<>(
-                reader, context, envConfig, (SeaTunnelRowType) 
source.getProducedType());
+        return new FlinkSourceReader<>(reader, context, envConfig);
     }
 
     @Override
@@ -110,7 +107,7 @@ public class FlinkSource<SplitT extends SourceSplit, 
EnumStateT extends Serializ
     }
 
     @Override
-    public TypeInformation<Row> getProducedType() {
-        return (TypeInformation<Row>) 
TypeConverterUtils.convert(source.getProducedType());
+    public TypeInformation<SeaTunnelRow> getProducedType() {
+        return TypeInformation.of(SeaTunnelRow.class);
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
index 65dc432477..c2f9cde500 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
@@ -21,13 +21,11 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.core.io.InputStatus;
-import org.apache.flink.types.Row;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +41,7 @@ import java.util.stream.Collectors;
  * @param <SplitT>
  */
 public class FlinkSourceReader<SplitT extends SourceSplit>
-        implements SourceReader<Row, SplitWrapper<SplitT>> {
+        implements SourceReader<SeaTunnelRow, SplitWrapper<SplitT>> {
 
     private final Logger LOGGER = 
LoggerFactory.getLogger(FlinkSourceReader.class);
 
@@ -58,12 +56,10 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
     public FlinkSourceReader(
             org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> 
sourceReader,
             org.apache.seatunnel.api.source.SourceReader.Context context,
-            Config envConfig,
-            SeaTunnelRowType seaTunnelRowType) {
+            Config envConfig) {
         this.sourceReader = sourceReader;
         this.context = context;
-        this.flinkRowCollector =
-                new FlinkRowCollector(seaTunnelRowType, envConfig, 
context.getMetricsContext());
+        this.flinkRowCollector = new FlinkRowCollector(envConfig, 
context.getMetricsContext());
     }
 
     @Override
@@ -76,7 +72,7 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
     }
 
     @Override
-    public InputStatus pollNext(ReaderOutput<Row> output) throws Exception {
+    public InputStatus pollNext(ReaderOutput<SeaTunnelRow> output) throws 
Exception {
         if (!((FlinkSourceReaderContext) context).isSendNoMoreElementEvent()) {
             sourceReader.pollNext(flinkRowCollector.withReaderOutput(output));
         } else {
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
deleted file mode 100644
index ebb77da268..0000000000
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.utils;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TypeConverterUtils {
-
-    private static final Map<Class<?>, BridgedType> BRIDGED_TYPES = new 
HashMap<>(32);
-
-    static {
-        // basic types
-        BRIDGED_TYPES.put(
-                String.class,
-                BridgedType.of(BasicType.STRING_TYPE, 
BasicTypeInfo.STRING_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Boolean.class,
-                BridgedType.of(BasicType.BOOLEAN_TYPE, 
BasicTypeInfo.BOOLEAN_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Byte.class, BridgedType.of(BasicType.BYTE_TYPE, 
BasicTypeInfo.BYTE_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Short.class, BridgedType.of(BasicType.SHORT_TYPE, 
BasicTypeInfo.SHORT_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Integer.class, BridgedType.of(BasicType.INT_TYPE, 
BasicTypeInfo.INT_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Long.class, BridgedType.of(BasicType.LONG_TYPE, 
BasicTypeInfo.LONG_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Float.class, BridgedType.of(BasicType.FLOAT_TYPE, 
BasicTypeInfo.FLOAT_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Double.class,
-                BridgedType.of(BasicType.DOUBLE_TYPE, 
BasicTypeInfo.DOUBLE_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Void.class, BridgedType.of(BasicType.VOID_TYPE, 
BasicTypeInfo.VOID_TYPE_INFO));
-        /**
-         * To solve lost precision and scale of {@link
-         * org.apache.seatunnel.api.table.type.DecimalType}, use {@link
-         * 
org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the 
payload of
-         * {@link org.apache.seatunnel.api.table.type.DecimalType}.
-         */
-        BRIDGED_TYPES.put(
-                BigDecimal.class,
-                BridgedType.of(new DecimalType(38, 18), 
BasicTypeInfo.STRING_TYPE_INFO));
-
-        // data time types
-        BRIDGED_TYPES.put(
-                LocalDate.class,
-                BridgedType.of(LocalTimeType.LOCAL_DATE_TYPE, 
LocalTimeTypeInfo.LOCAL_DATE));
-        BRIDGED_TYPES.put(
-                LocalTime.class,
-                BridgedType.of(LocalTimeType.LOCAL_TIME_TYPE, 
LocalTimeTypeInfo.LOCAL_TIME));
-        BRIDGED_TYPES.put(
-                LocalDateTime.class,
-                BridgedType.of(
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE, 
LocalTimeTypeInfo.LOCAL_DATE_TIME));
-        // basic array types
-        BRIDGED_TYPES.put(
-                byte[].class,
-                BridgedType.of(
-                        PrimitiveByteArrayType.INSTANCE,
-                        
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                String[].class,
-                BridgedType.of(
-                        ArrayType.STRING_ARRAY_TYPE, 
BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Boolean[].class,
-                BridgedType.of(
-                        ArrayType.BOOLEAN_ARRAY_TYPE, 
BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Byte[].class,
-                BridgedType.of(ArrayType.BYTE_ARRAY_TYPE, 
BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Short[].class,
-                BridgedType.of(
-                        ArrayType.SHORT_ARRAY_TYPE, 
BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Integer[].class,
-                BridgedType.of(ArrayType.INT_ARRAY_TYPE, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Long[].class,
-                BridgedType.of(ArrayType.LONG_ARRAY_TYPE, 
BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Float[].class,
-                BridgedType.of(
-                        ArrayType.FLOAT_ARRAY_TYPE, 
BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Double[].class,
-                BridgedType.of(
-                        ArrayType.DOUBLE_ARRAY_TYPE, 
BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO));
-    }
-
-    private TypeConverterUtils() {
-        throw new UnsupportedOperationException(
-                "TypeConverterUtils is a utility class and cannot be 
instantiated");
-    }
-
-    public static SeaTunnelDataType<?> convert(TypeInformation<?> dataType) {
-        BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
-        if (bridgedType != null) {
-            return bridgedType.getSeaTunnelType();
-        }
-
-        if (dataType instanceof MapTypeInfo) {
-            MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) dataType;
-            return new MapType<>(
-                    convert(mapTypeInfo.getKeyTypeInfo()), 
convert(mapTypeInfo.getValueTypeInfo()));
-        }
-        if (dataType instanceof RowTypeInfo) {
-            RowTypeInfo typeInformation = (RowTypeInfo) dataType;
-            String[] fieldNames = typeInformation.getFieldNames();
-            SeaTunnelDataType<?>[] seaTunnelDataTypes =
-                    Arrays.stream(typeInformation.getFieldTypes())
-                            .map(TypeConverterUtils::convert)
-                            .toArray(SeaTunnelDataType[]::new);
-            return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
-        }
-        throw new IllegalArgumentException("Unsupported Flink's data type: " + 
dataType);
-    }
-
-    public static TypeInformation<?> convert(SeaTunnelDataType<?> dataType) {
-        BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
-        if (bridgedType != null) {
-            return bridgedType.getFlinkType();
-        }
-
-        if (dataType instanceof MapType) {
-            MapType<?, ?> mapType = (MapType<?, ?>) dataType;
-            return new MapTypeInfo<>(
-                    convert(mapType.getKeyType()), 
convert(mapType.getValueType()));
-        }
-
-        if (dataType instanceof ArrayType) {
-            ArrayType arrayType = (ArrayType) dataType;
-            return ObjectArrayTypeInfo.getInfoFor(
-                    arrayType.getTypeClass(), 
convert(arrayType.getElementType()));
-        }
-
-        if (dataType instanceof SeaTunnelRowType) {
-            SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
-            TypeInformation<?>[] types =
-                    Arrays.stream(rowType.getFieldTypes())
-                            .map(TypeConverterUtils::convert)
-                            .toArray(TypeInformation[]::new);
-            return new RowTypeInfo(types, rowType.getFieldNames());
-        }
-        throw new IllegalArgumentException("Unsupported SeaTunnel's data type: 
" + dataType);
-    }
-
-    public static class BridgedType {
-        private final SeaTunnelDataType<?> seaTunnelType;
-        private final TypeInformation<?> flinkType;
-
-        private BridgedType(SeaTunnelDataType<?> seaTunnelType, 
TypeInformation<?> flinkType) {
-            this.seaTunnelType = seaTunnelType;
-            this.flinkType = flinkType;
-        }
-
-        public static BridgedType of(
-                SeaTunnelDataType<?> seaTunnelType, TypeInformation<?> 
flinkType) {
-            return new BridgedType(seaTunnelType, flinkType);
-        }
-
-        public TypeInformation<?> getFlinkType() {
-            return flinkType;
-        }
-
-        public SeaTunnelDataType<?> getSeaTunnelType() {
-            return seaTunnelType;
-        }
-    }
-}

Reply via email to