This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new b23266e0 Add SeaTunnelRowTypeInfo to support source return record type 
(#1894)
b23266e0 is described below

commit b23266e0e1b6fad4dc95ad01ae09d66515194181
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue May 17 16:22:13 2022 +0800

    Add SeaTunnelRowTypeInfo to support source return record type (#1894)
    
    * Add SeaTunnelRowTypeInfo to support source return record type
---
 .../seatunnel/api/common/SeaTunnelContext.java     | 68 ++++++++++++++++++++++
 .../seatunnel/api/source/SeaTunnelSource.java      |  8 +++
 .../apache/seatunnel/api/table/catalog/Column.java |  5 +-
 .../api/table/type/SeaTunnelDataType.java          |  4 +-
 ...nnelDataType.java => SeaTunnelRowTypeInfo.java} | 17 ++++--
 .../seatunnel/fake/source/FakeSource.java          | 10 ++++
 .../command/SeaTunnelApiTaskExecuteCommand.java    | 35 +++++++++--
 .../seatunnel-flink-examples/pom.xml               |  6 ++
 .../seatunnel-translation-flink/pom.xml            |  6 ++
 .../flink/source/SeaTunnelParallelSource.java      | 21 ++++---
 .../flink/source/WrappedRowCollector.java          |  8 +--
 .../flink/utils/TypeConverterUtilsTest.java        | 22 +++++--
 12 files changed, 181 insertions(+), 29 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
new file mode 100644
index 00000000..271bc706
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common;
+
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.common.config.Common;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is used to store the context of the application. e.g. the table 
schema, catalog...etc.
+ */
+public final class SeaTunnelContext implements Serializable {
+
+    private static final long serialVersionUID = -1L;
+
+    private static final SeaTunnelContext INSTANCE = new SeaTunnelContext();
+
+    // tableName -> tableSchema
+    private Map<String, TableSchema> tableSchemaMap = new 
ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+
+    public static SeaTunnelContext getContext() {
+        return INSTANCE;
+    }
+
+    /**
+     * Put table schema.
+     *
+     * @param tableName   table name
+     * @param tableSchema table schema
+     */
+    public void addSchema(String tableName, TableSchema tableSchema) {
+        tableSchemaMap.put(tableName, tableSchema);
+    }
+
+    /**
+     * Get table schema.
+     *
+     * @param tableName table name.
+     * @return table schema.
+     */
+    public Optional<TableSchema> getSchema(String tableName) {
+        return Optional.ofNullable(tableSchemaMap.get(tableName));
+    }
+
+    private SeaTunnelContext() {
+        // no-op
+    }
+
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index c1f145bc..cf48428a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.api.source;
 
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 
 import java.io.Serializable;
 
@@ -39,6 +40,13 @@ public interface SeaTunnelSource<T, SplitT extends 
SourceSplit, StateT> extends
      */
     Boundedness getBoundedness();
 
+    /**
+     * Get the row type information of the records produced by this source.
+     *
+     * @return SeaTunnel row type information.
+     */
+    SeaTunnelRowTypeInfo getRowTypeInfo();
+
     /**
      * Create source reader, used to produce data.
      *
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index c1f5ed78..aea7fade 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -20,11 +20,14 @@ package org.apache.seatunnel.api.table.catalog;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
+import java.io.Serializable;
 import java.util.Objects;
 import java.util.Optional;
 
 @SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
-public abstract class Column {
+public abstract class Column implements Serializable {
+
+    private static final long serialVersionUID = -1L;
 
     /**
      * column name.
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
index d25361f1..2ab1b22c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
@@ -17,10 +17,12 @@
 
 package org.apache.seatunnel.api.table.type;
 
+import java.io.Serializable;
+
 /**
  * Logic data type of column in SeaTunnel.
  */
-public interface SeaTunnelDataType<T> {
+public interface SeaTunnelDataType<T> extends Serializable {
 
 
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
similarity index 72%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
index d25361f1..bd721b9d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
@@ -17,10 +17,19 @@
 
 package org.apache.seatunnel.api.table.type;
 
-/**
- * Logic data type of column in SeaTunnel.
- */
-public interface SeaTunnelDataType<T> {
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
+@Data
+@AllArgsConstructor
+public class SeaTunnelRowTypeInfo {
+    /**
+     * The field name of the {@link SeaTunnelRow}.
+     */
+    private final String[] fieldNames;
+    /**
+     * The type of the field.
+     */
+    private final SeaTunnelDataType<?>[] seaTunnelDataTypes;
 
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 074816b6..0e687517 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -23,7 +23,10 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
 
 import com.google.auto.service.AutoService;
@@ -36,6 +39,13 @@ public class FakeSource implements 
SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
         return Boundedness.BOUNDED;
     }
 
+    @Override
+    public SeaTunnelRowTypeInfo getRowTypeInfo() {
+        return new SeaTunnelRowTypeInfo(
+            new String[]{"name", "age", "timestamp"},
+            new SeaTunnelDataType<?>[]{BasicType.STRING, BasicType.INTEGER, 
BasicType.LONG});
+    }
+
     @Override
     public SourceReader<SeaTunnelRow, FakeSourceSplit> 
createReader(SourceReader.Context readerContext) {
         return new FakeSourceReader(readerContext);
diff --git 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
index 14a23cd5..8673ba02 100644
--- 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -26,10 +26,10 @@ import 
org.apache.seatunnel.core.base.exception.CommandExecuteException;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.util.TableUtil;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
 import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
 import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
 
@@ -37,8 +37,12 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.common.collect.Lists;
 import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,15 +76,21 @@ public class SeaTunnelApiTaskExecuteCommand implements 
Command<FlinkCommandArgs>
 
         SeaTunnelParallelSource source = getSource(config);
         // todo: add basic type
-        Sink<WrappedRow, Object, Object, Object> flinkSink = getSink(config);
+        Sink<Row, Object, Object, Object> flinkSink = getSink(config);
 
         registerPlugins(flinkEnvironment);
 
         StreamExecutionEnvironment streamExecutionEnvironment = 
flinkEnvironment.getStreamExecutionEnvironment();
         // support multiple sources/sink
-        DataStreamSource<WrappedRow> dataStream = 
streamExecutionEnvironment.addSource(source);
+        DataStreamSource<Row> dataStream = 
streamExecutionEnvironment.addSource(source);
+        registerTable(dataStream, "fake_table", "name, age", flinkEnvironment);
         // todo: add transform
-        dataStream.sinkTo(flinkSink);
+        DataStream<Row> transformOutputStream = TableUtil.tableToDataStream(
+            flinkEnvironment.getStreamTableEnvironment(),
+            flinkEnvironment.getStreamTableEnvironment().sqlQuery("select * 
from fake_table"),
+            false);
+        // add sink
+        transformOutputStream.sinkTo(flinkSink);
         try {
             
streamExecutionEnvironment.execute("SeaTunnelAPITaskExecuteCommand");
         } catch (Exception e) {
@@ -88,6 +98,19 @@ public class SeaTunnelApiTaskExecuteCommand implements 
Command<FlinkCommandArgs>
         }
     }
 
+    private void registerTable(DataStream<Row> dataStream, String tableName, 
String fields, FlinkEnvironment flinkEnvironment) {
+        StreamTableEnvironment streamTableEnvironment = 
flinkEnvironment.getStreamTableEnvironment();
+        if (!TableUtil.tableExists(streamTableEnvironment, tableName)) {
+            streamTableEnvironment.registerDataStream(tableName, dataStream, 
fields);
+        }
+    }
+
+    private DataStream<Row> fromSourceTable(FlinkEnvironment flinkEnvironment, 
String tableName) {
+        StreamTableEnvironment streamTableEnvironment = 
flinkEnvironment.getStreamTableEnvironment();
+        Table fakeTable = streamTableEnvironment.scan(tableName);
+        return TableUtil.tableToDataStream(streamTableEnvironment, fakeTable, 
true);
+    }
+
     private SeaTunnelParallelSource getSource(Config config) {
         PluginIdentifier pluginIdentifier = getSourcePluginIdentifier();
         // todo: use FactoryUtils to load the plugin
@@ -95,10 +118,10 @@ public class SeaTunnelApiTaskExecuteCommand implements 
Command<FlinkCommandArgs>
         return new 
SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier));
     }
 
-    private Sink<WrappedRow, Object, Object, Object> getSink(Config config) {
+    private Sink<Row, Object, Object, Object> getSink(Config config) {
         PluginIdentifier pluginIdentifier = getSinkPluginIdentifier();
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
-        FlinkSinkConverter<SeaTunnelRow, WrappedRow, Object, Object, Object> 
flinkSinkConverter = new FlinkSinkConverter<>();
+        FlinkSinkConverter<SeaTunnelRow, Row, Object, Object, Object> 
flinkSinkConverter = new FlinkSinkConverter<>();
         return 
flinkSinkConverter.convert(sinkPluginDiscovery.getPluginInstance(pluginIdentifier),
 Collections.emptyMap());
     }
 
diff --git a/seatunnel-examples/seatunnel-flink-examples/pom.xml 
b/seatunnel-examples/seatunnel-flink-examples/pom.xml
index bd5caf5e..9058a6fe 100644
--- a/seatunnel-examples/seatunnel-flink-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-examples/pom.xml
@@ -81,6 +81,12 @@
             <version>${flink.version}</version>
             <scope>${flink.scope}</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${flink.scope}</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml 
b/seatunnel-translation/seatunnel-translation-flink/pom.xml
index 114d4134..1d371cc8 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml
@@ -36,6 +36,12 @@
             <version>${project.version}</version>
         </dependency>
         <!-- apache flink table -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
index 48fe0d1c..588d056d 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
@@ -19,8 +19,8 @@ package org.apache.seatunnel.translation.flink.source;
 
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.serialization.KryoTypeInfo;
-import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 import org.apache.seatunnel.translation.source.ParallelSource;
 
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -30,21 +30,24 @@ import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
-public class SeaTunnelParallelSource extends 
RichParallelSourceFunction<WrappedRow>
-        implements CheckpointListener, ResultTypeQueryable<WrappedRow>, 
CheckpointedFunction {
+public class SeaTunnelParallelSource extends RichParallelSourceFunction<Row>
+    implements CheckpointListener, ResultTypeQueryable<Row>, 
CheckpointedFunction {
     private static final Logger LOG = 
LoggerFactory.getLogger(SeaTunnelParallelSource.class);
     protected static final String PARALLEL_SOURCE_STATE_NAME = 
"parallel-source-states";
 
@@ -75,7 +78,7 @@ public class SeaTunnelParallelSource extends 
RichParallelSourceFunction<WrappedR
     }
 
     @Override
-    public void run(SourceFunction.SourceContext<WrappedRow> sourceContext) 
throws Exception {
+    public void run(SourceFunction.SourceContext<Row> sourceContext) throws 
Exception {
         parallelSource.run(new WrappedRowCollector(sourceContext));
     }
 
@@ -100,9 +103,11 @@ public class SeaTunnelParallelSource extends 
RichParallelSourceFunction<WrappedR
     }
 
     @Override
-    public TypeInformation<WrappedRow> getProducedType() {
-        // todo: add type transformation
-        return new KryoTypeInfo<>(WrappedRow.class);
+    public TypeInformation<Row> getProducedType() {
+        SeaTunnelRowTypeInfo rowTypeInfo = source.getRowTypeInfo();
+        TypeInformation<?>[] typeInformation = 
Arrays.stream(rowTypeInfo.getSeaTunnelDataTypes())
+            
.map(TypeConverterUtils::convertType).toArray(TypeInformation[]::new);
+        return new RowTypeInfo(typeInformation, rowTypeInfo.getFieldNames());
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
index 1d0ea225..f5b6649e 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
@@ -20,25 +20,25 @@ package org.apache.seatunnel.translation.flink.source;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.translation.flink.serialization.FlinkRowSerialization;
-import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.types.Row;
 
 import java.io.IOException;
 
 public class WrappedRowCollector implements Collector<SeaTunnelRow> {
 
-    protected final SourceFunction.SourceContext<WrappedRow> internalCollector;
+    protected final SourceFunction.SourceContext<Row> internalCollector;
     protected final FlinkRowSerialization rowSerialization = new 
FlinkRowSerialization();
 
-    public WrappedRowCollector(SourceFunction.SourceContext<WrappedRow> 
internalCollector) {
+    public WrappedRowCollector(SourceFunction.SourceContext<Row> 
internalCollector) {
         this.internalCollector = internalCollector;
     }
 
     @Override
     public void collect(SeaTunnelRow record) {
         try {
-            internalCollector.collect(new 
WrappedRow(rowSerialization.serialize(record), ""));
+            internalCollector.collect(rowSerialization.serialize(record));
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
similarity index 53%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
copy to 
seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
index d25361f1..accd0c11 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
@@ -15,12 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.translation.flink.utils;
 
-/**
- * Logic data type of column in SeaTunnel.
- */
-public interface SeaTunnelDataType<T> {
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TypeConverterUtilsTest {
+
+    @Test
+    public void convertType() {
+        SeaTunnelDataType<?> intBasicType = BasicType.INTEGER;
+        Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
TypeConverterUtils.convertType(intBasicType));
 
+        BasicType<Long> longBasicType = BasicType.LONG;
+        Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
TypeConverterUtils.convertType(longBasicType));
 
+    }
 }

Reply via email to