This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new b23266e0 Add SeaTunnelRowTypeInfo to support source return record type
(#1894)
b23266e0 is described below
commit b23266e0e1b6fad4dc95ad01ae09d66515194181
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue May 17 16:22:13 2022 +0800
Add SeaTunnelRowTypeInfo to support source return record type (#1894)
* Add SeaTunnelRowTypeInfo to support source return record type
---
.../seatunnel/api/common/SeaTunnelContext.java | 68 ++++++++++++++++++++++
.../seatunnel/api/source/SeaTunnelSource.java | 8 +++
.../apache/seatunnel/api/table/catalog/Column.java | 5 +-
.../api/table/type/SeaTunnelDataType.java | 4 +-
...nnelDataType.java => SeaTunnelRowTypeInfo.java} | 17 ++++--
.../seatunnel/fake/source/FakeSource.java | 10 ++++
.../command/SeaTunnelApiTaskExecuteCommand.java | 35 +++++++++--
.../seatunnel-flink-examples/pom.xml | 6 ++
.../seatunnel-translation-flink/pom.xml | 6 ++
.../flink/source/SeaTunnelParallelSource.java | 21 ++++---
.../flink/source/WrappedRowCollector.java | 8 +--
.../flink/utils/TypeConverterUtilsTest.java | 22 +++++--
12 files changed, 181 insertions(+), 29 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
new file mode 100644
index 00000000..271bc706
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.common;
+
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.common.config.Common;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is used to store the context of the application. e.g. the table
schema, catalog...etc.
+ */
+public final class SeaTunnelContext implements Serializable {
+
+ private static final long serialVersionUID = -1L;
+
+ private static final SeaTunnelContext INSTANCE = new SeaTunnelContext();
+
+ // tableName -> tableSchema
+ private Map<String, TableSchema> tableSchemaMap = new
ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+
+ public static SeaTunnelContext getContext() {
+ return INSTANCE;
+ }
+
+ /**
+ * Put table schema.
+ *
+ * @param tableName table name
+ * @param tableSchema table schema
+ */
+ public void addSchema(String tableName, TableSchema tableSchema) {
+ tableSchemaMap.put(tableName, tableSchema);
+ }
+
+ /**
+ * Get table schema.
+ *
+ * @param tableName table name.
+ * @return table schema.
+ */
+ public Optional<TableSchema> getSchema(String tableName) {
+ return Optional.ofNullable(tableSchemaMap.get(tableName));
+ }
+
+ private SeaTunnelContext() {
+ // no-op
+ }
+
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index c1f145bc..cf48428a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.api.source;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import java.io.Serializable;
@@ -39,6 +40,13 @@ public interface SeaTunnelSource<T, SplitT extends
SourceSplit, StateT> extends
*/
Boundedness getBoundedness();
+ /**
+ * Get the row type information of the records produced by this source.
+ *
+ * @return SeaTunnel row type information.
+ */
+ SeaTunnelRowTypeInfo getRowTypeInfo();
+
/**
* Create source reader, used to produce data.
*
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index c1f5ed78..aea7fade 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -20,11 +20,14 @@ package org.apache.seatunnel.api.table.catalog;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
-public abstract class Column {
+public abstract class Column implements Serializable {
+
+ private static final long serialVersionUID = -1L;
/**
* column name.
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
index d25361f1..2ab1b22c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
@@ -17,10 +17,12 @@
package org.apache.seatunnel.api.table.type;
+import java.io.Serializable;
+
/**
* Logic data type of column in SeaTunnel.
*/
-public interface SeaTunnelDataType<T> {
+public interface SeaTunnelDataType<T> extends Serializable {
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
similarity index 72%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
index d25361f1..bd721b9d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
@@ -17,10 +17,19 @@
package org.apache.seatunnel.api.table.type;
-/**
- * Logic data type of column in SeaTunnel.
- */
-public interface SeaTunnelDataType<T> {
+import lombok.AllArgsConstructor;
+import lombok.Data;
+@Data
+@AllArgsConstructor
+public class SeaTunnelRowTypeInfo {
+ /**
+ * The field name of the {@link SeaTunnelRow}.
+ */
+ private final String[] fieldNames;
+ /**
+ * The type of the field.
+ */
+ private final SeaTunnelDataType<?>[] seaTunnelDataTypes;
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 074816b6..0e687517 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -23,7 +23,10 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
import com.google.auto.service.AutoService;
@@ -36,6 +39,13 @@ public class FakeSource implements
SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
return Boundedness.BOUNDED;
}
+ @Override
+ public SeaTunnelRowTypeInfo getRowTypeInfo() {
+ return new SeaTunnelRowTypeInfo(
+ new String[]{"name", "age", "timestamp"},
+ new SeaTunnelDataType<?>[]{BasicType.STRING, BasicType.INTEGER,
BasicType.LONG});
+ }
+
@Override
public SourceReader<SeaTunnelRow, FakeSourceSplit>
createReader(SourceReader.Context readerContext) {
return new FakeSourceReader(readerContext);
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
index 14a23cd5..8673ba02 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -26,10 +26,10 @@ import
org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.util.TableUtil;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
@@ -37,8 +37,12 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.common.collect.Lists;
import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,15 +76,21 @@ public class SeaTunnelApiTaskExecuteCommand implements
Command<FlinkCommandArgs>
SeaTunnelParallelSource source = getSource(config);
// todo: add basic type
- Sink<WrappedRow, Object, Object, Object> flinkSink = getSink(config);
+ Sink<Row, Object, Object, Object> flinkSink = getSink(config);
registerPlugins(flinkEnvironment);
StreamExecutionEnvironment streamExecutionEnvironment =
flinkEnvironment.getStreamExecutionEnvironment();
// support multiple sources/sink
- DataStreamSource<WrappedRow> dataStream =
streamExecutionEnvironment.addSource(source);
+ DataStreamSource<Row> dataStream =
streamExecutionEnvironment.addSource(source);
+ registerTable(dataStream, "fake_table", "name, age", flinkEnvironment);
// todo: add transform
- dataStream.sinkTo(flinkSink);
+ DataStream<Row> transformOutputStream = TableUtil.tableToDataStream(
+ flinkEnvironment.getStreamTableEnvironment(),
+ flinkEnvironment.getStreamTableEnvironment().sqlQuery("select *
from fake_table"),
+ false);
+ // add sink
+ transformOutputStream.sinkTo(flinkSink);
try {
streamExecutionEnvironment.execute("SeaTunnelAPITaskExecuteCommand");
} catch (Exception e) {
@@ -88,6 +98,19 @@ public class SeaTunnelApiTaskExecuteCommand implements
Command<FlinkCommandArgs>
}
}
+ private void registerTable(DataStream<Row> dataStream, String tableName,
String fields, FlinkEnvironment flinkEnvironment) {
+ StreamTableEnvironment streamTableEnvironment =
flinkEnvironment.getStreamTableEnvironment();
+ if (!TableUtil.tableExists(streamTableEnvironment, tableName)) {
+ streamTableEnvironment.registerDataStream(tableName, dataStream,
fields);
+ }
+ }
+
+ private DataStream<Row> fromSourceTable(FlinkEnvironment flinkEnvironment,
String tableName) {
+ StreamTableEnvironment streamTableEnvironment =
flinkEnvironment.getStreamTableEnvironment();
+ Table fakeTable = streamTableEnvironment.scan(tableName);
+ return TableUtil.tableToDataStream(streamTableEnvironment, fakeTable,
true);
+ }
+
private SeaTunnelParallelSource getSource(Config config) {
PluginIdentifier pluginIdentifier = getSourcePluginIdentifier();
// todo: use FactoryUtils to load the plugin
@@ -95,10 +118,10 @@ public class SeaTunnelApiTaskExecuteCommand implements
Command<FlinkCommandArgs>
return new
SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier));
}
- private Sink<WrappedRow, Object, Object, Object> getSink(Config config) {
+ private Sink<Row, Object, Object, Object> getSink(Config config) {
PluginIdentifier pluginIdentifier = getSinkPluginIdentifier();
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
- FlinkSinkConverter<SeaTunnelRow, WrappedRow, Object, Object, Object>
flinkSinkConverter = new FlinkSinkConverter<>();
+ FlinkSinkConverter<SeaTunnelRow, Row, Object, Object, Object>
flinkSinkConverter = new FlinkSinkConverter<>();
return
flinkSinkConverter.convert(sinkPluginDiscovery.getPluginInstance(pluginIdentifier),
Collections.emptyMap());
}
diff --git a/seatunnel-examples/seatunnel-flink-examples/pom.xml
b/seatunnel-examples/seatunnel-flink-examples/pom.xml
index bd5caf5e..9058a6fe 100644
--- a/seatunnel-examples/seatunnel-flink-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-examples/pom.xml
@@ -81,6 +81,12 @@
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${flink.scope}</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml
b/seatunnel-translation/seatunnel-translation-flink/pom.xml
index 114d4134..1d371cc8 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml
@@ -36,6 +36,12 @@
<version>${project.version}</version>
</dependency>
<!-- apache flink table -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
index 48fe0d1c..588d056d 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
@@ -19,8 +19,8 @@ package org.apache.seatunnel.translation.flink.source;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.serialization.KryoTypeInfo;
-import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import org.apache.seatunnel.translation.source.ParallelSource;
import org.apache.flink.api.common.state.CheckpointListener;
@@ -30,21 +30,24 @@ import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-public class SeaTunnelParallelSource extends
RichParallelSourceFunction<WrappedRow>
- implements CheckpointListener, ResultTypeQueryable<WrappedRow>,
CheckpointedFunction {
+public class SeaTunnelParallelSource extends RichParallelSourceFunction<Row>
+ implements CheckpointListener, ResultTypeQueryable<Row>,
CheckpointedFunction {
private static final Logger LOG =
LoggerFactory.getLogger(SeaTunnelParallelSource.class);
protected static final String PARALLEL_SOURCE_STATE_NAME =
"parallel-source-states";
@@ -75,7 +78,7 @@ public class SeaTunnelParallelSource extends
RichParallelSourceFunction<WrappedR
}
@Override
- public void run(SourceFunction.SourceContext<WrappedRow> sourceContext)
throws Exception {
+ public void run(SourceFunction.SourceContext<Row> sourceContext) throws
Exception {
parallelSource.run(new WrappedRowCollector(sourceContext));
}
@@ -100,9 +103,11 @@ public class SeaTunnelParallelSource extends
RichParallelSourceFunction<WrappedR
}
@Override
- public TypeInformation<WrappedRow> getProducedType() {
- // todo: add type transformation
- return new KryoTypeInfo<>(WrappedRow.class);
+ public TypeInformation<Row> getProducedType() {
+ SeaTunnelRowTypeInfo rowTypeInfo = source.getRowTypeInfo();
+ TypeInformation<?>[] typeInformation =
Arrays.stream(rowTypeInfo.getSeaTunnelDataTypes())
+
.map(TypeConverterUtils::convertType).toArray(TypeInformation[]::new);
+ return new RowTypeInfo(typeInformation, rowTypeInfo.getFieldNames());
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
index 1d0ea225..f5b6649e 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
@@ -20,25 +20,25 @@ package org.apache.seatunnel.translation.flink.source;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.translation.flink.serialization.FlinkRowSerialization;
-import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.types.Row;
import java.io.IOException;
public class WrappedRowCollector implements Collector<SeaTunnelRow> {
- protected final SourceFunction.SourceContext<WrappedRow> internalCollector;
+ protected final SourceFunction.SourceContext<Row> internalCollector;
protected final FlinkRowSerialization rowSerialization = new
FlinkRowSerialization();
- public WrappedRowCollector(SourceFunction.SourceContext<WrappedRow>
internalCollector) {
+ public WrappedRowCollector(SourceFunction.SourceContext<Row>
internalCollector) {
this.internalCollector = internalCollector;
}
@Override
public void collect(SeaTunnelRow record) {
try {
- internalCollector.collect(new
WrappedRow(rowSerialization.serialize(record), ""));
+ internalCollector.collect(rowSerialization.serialize(record));
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
similarity index 53%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
copy to
seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
index d25361f1..accd0c11 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
@@ -15,12 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.translation.flink.utils;
-/**
- * Logic data type of column in SeaTunnel.
- */
-public interface SeaTunnelDataType<T> {
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TypeConverterUtilsTest {
+
+ @Test
+ public void convertType() {
+ SeaTunnelDataType<?> intBasicType = BasicType.INTEGER;
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO,
TypeConverterUtils.convertType(intBasicType));
+ BasicType<Long> longBasicType = BasicType.LONG;
+ Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
TypeConverterUtils.convertType(longBasicType));
+ }
}