This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 068c5e3e3e [Core][Flink] refactor flink proxy source/sink (#7355)
068c5e3e3e is described below
commit 068c5e3e3e8518e8cef2bd7599d85455d1adbaef
Author: Tyrantlucifer <[email protected]>
AuthorDate: Mon Aug 12 12:55:41 2024 +0800
[Core][Flink] refactor flink proxy source/sink (#7355)
---
.../seatunnel/api/table/type/SeaTunnelRow.java | 15 ++
.../core/starter/execution/PluginUtil.java | 1 +
.../core/starter/execution/SourceTableInfo.java | 1 +
.../flink/execution/FlinkRuntimeEnvironment.java | 25 ---
.../flink/execution/SinkExecuteProcessor.java | 12 +-
.../execution/AbstractFlinkRuntimeEnvironment.java | 18 --
.../flink/execution/DataStreamTableInfo.java | 8 +-
.../FlinkAbstractPluginExecuteProcessor.java | 22 +--
.../flink/execution/FlinkRuntimeEnvironment.java | 25 ---
.../flink/execution/SinkExecuteProcessor.java | 13 +-
.../flink/execution/SourceExecuteProcessor.java | 8 +-
.../flink/execution/TransformExecuteProcessor.java | 53 ++----
.../seatunnel/engine/e2e/UnifyEnvParameterIT.java | 16 --
.../seatunnel-flink-connector-v2-example/pom.xml | 6 +
.../main/resources/examples/fake_to_console.conf | 12 +-
.../plugin/discovery/AbstractPluginDiscovery.java | 1 +
.../flink/utils/TypeConverterUtilsTest.java | 161 ----------------
.../flink/serialization/FlinkRowConverter.java | 154 ---------------
.../translation/flink/sink/FlinkSinkWriter.java | 16 +-
.../flink/source/FlinkRowCollector.java | 21 +--
.../translation/flink/source/FlinkSource.java | 17 +-
.../flink/source/FlinkSourceReader.java | 12 +-
.../flink/utils/TypeConverterUtils.java | 210 ---------------------
23 files changed, 103 insertions(+), 724 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 95a36b796c..11388dbb6a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -50,7 +50,16 @@ public final class SeaTunnelRow implements Serializable {
this.tableId = tableId;
}
+ /**
+ * The method will be removed in the future, please use {@link
#setKind(RowKind)} instanced of
+ * it.
+ */
+ @Deprecated
public void setRowKind(RowKind kind) {
+ setKind(kind);
+ }
+
+ public void setKind(RowKind kind) {
this.kind = kind;
}
@@ -62,7 +71,13 @@ public final class SeaTunnelRow implements Serializable {
return tableId;
}
+ /** The method will be removed in the future, please use {@link
#getKind()} instanced of it. */
+ @Deprecated
public RowKind getRowKind() {
+ return getKind();
+ }
+
+ public RowKind getKind() {
return this.kind;
}
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index 166e581e2d..c47ea0b121 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -50,6 +50,7 @@ import static
org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
/** The util used for Spark/Flink to create to SeaTunnelSource etc. */
+@SuppressWarnings("rawtypes")
public class PluginUtil {
protected static final String ENGINE_TYPE = "seatunnel";
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
index 529b9b4207..43642f5735 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java
@@ -27,6 +27,7 @@ import java.util.List;
@Data
@AllArgsConstructor
+@SuppressWarnings("rawtypes")
public class SourceTableInfo {
private SeaTunnelSource source;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index fcc25a6b9e..e3428c751e 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -21,13 +21,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
-import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import lombok.extern.slf4j.Slf4j;
@@ -50,7 +43,6 @@ public class FlinkRuntimeEnvironment extends
AbstractFlinkRuntimeEnvironment
@Override
public FlinkRuntimeEnvironment prepare() {
createStreamEnvironment();
- createStreamTableEnvironment();
if (config.hasPath("job.name")) {
jobName = config.getString("job.name");
}
@@ -63,23 +55,6 @@ public class FlinkRuntimeEnvironment extends
AbstractFlinkRuntimeEnvironment
return this;
}
- private void createStreamTableEnvironment() {
- EnvironmentSettings environmentSettings =
-
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
- tableEnvironment =
- StreamTableEnvironment.create(getStreamExecutionEnvironment(),
environmentSettings);
- TableConfig config = tableEnvironment.getConfig();
- if (EnvironmentUtil.hasPathAndWaring(this.config,
ConfigKeyName.MAX_STATE_RETENTION_TIME)
- && EnvironmentUtil.hasPathAndWaring(
- this.config, ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
- long max =
this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
- long min =
this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
- config.setIdleStateRetentionTime(Time.seconds(min),
Time.seconds(max));
- }
- // init flink table env config
- EnvironmentUtil.initTableEnvironmentConfiguration(this.config,
config.getConfiguration());
- }
-
public static FlinkRuntimeEnvironment getInstance(Config config) {
if (INSTANCE == null) {
synchronized (FlinkRuntimeEnvironment.class) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 6a272aadb2..51586beaf0 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -51,6 +51,7 @@ import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+@SuppressWarnings({"unchecked", "rawtypes"})
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
@@ -107,12 +108,15 @@ public class SinkExecuteProcessor
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
sink.setJobContext(jobContext);
- SeaTunnelRowType sourceType =
stream.getCatalogTable().getSeaTunnelRowType();
+ // TODO support sink multi sink
+ SeaTunnelRowType sourceType =
+ stream.getCatalogTables().get(0).getSeaTunnelRowType();
sink.setTypeInfo(sourceType);
} else {
+ // TODO support sink multi sink
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
- stream.getCatalogTable(),
+ stream.getCatalogTables().get(0),
ReadonlyConfig.fromConfig(sinkConfig),
classLoader,
((TableSinkFactory) factory.get())
@@ -134,8 +138,8 @@ public class SinkExecuteProcessor
}
DataStreamSink<Row> dataStreamSink =
stream.getDataStream()
- .sinkTo(new FlinkSink<>(sink,
stream.getCatalogTable()))
- .name(sink.getPluginName());
+ .sinkTo(new FlinkSink<>(sink,
stream.getCatalogTables().get(0)))
+ .name(String.format("%s-Sink",
sink.getPluginName()));
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
index d805c286f8..34d9184277 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
-import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
@@ -37,11 +36,8 @@ import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
import org.apache.flink.util.TernaryBoolean;
import lombok.extern.slf4j.Slf4j;
@@ -55,10 +51,8 @@ import java.util.stream.Collectors;
@Slf4j
public abstract class AbstractFlinkRuntimeEnvironment implements
RuntimeEnvironment {
- protected static final String RESULT_TABLE_NAME = "result_table_name";
protected Config config;
protected StreamExecutionEnvironment environment;
- protected StreamTableEnvironment tableEnvironment;
protected JobMode jobMode;
protected String jobName = Constants.LOGO;
@@ -78,10 +72,6 @@ public abstract class AbstractFlinkRuntimeEnvironment
implements RuntimeEnvironm
return EnvironmentUtil.checkRestartStrategy(config);
}
- public StreamTableEnvironment getStreamTableEnvironment() {
- return tableEnvironment;
- }
-
public StreamExecutionEnvironment getStreamExecutionEnvironment() {
return environment;
}
@@ -228,14 +218,6 @@ public abstract class AbstractFlinkRuntimeEnvironment
implements RuntimeEnvironm
}
}
- public void registerResultTable(Config config, DataStream<Row> dataStream,
String name) {
- StreamTableEnvironment tableEnvironment =
this.getStreamTableEnvironment();
- if (!TableUtil.tableExists(tableEnvironment, name)) {
- tableEnvironment.createTemporaryView(
- name, tableEnvironment.fromChangelogStream(dataStream));
- }
- }
-
public boolean isStreaming() {
return JobMode.STREAMING.equals(jobMode);
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
index 7b158ee60b..a80a09b506 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java
@@ -18,20 +18,22 @@
package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
import lombok.AllArgsConstructor;
import lombok.Data;
+import java.util.List;
+
@Data
@AllArgsConstructor
public class DataStreamTableInfo {
- private DataStream<Row> dataStream;
+ private DataStream<SeaTunnelRow> dataStream;
- private CatalogTable catalogTable;
+ private List<CatalogTable> catalogTables;
private String tableName;
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 565b7379bf..57956db56c 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -23,12 +23,6 @@ import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
import java.net.URL;
import java.net.URLClassLoader;
@@ -36,8 +30,6 @@ import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
-import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
-
public abstract class FlinkAbstractPluginExecuteProcessor<T>
implements PluginExecuteProcessor<DataStreamTableInfo,
FlinkRuntimeEnvironment> {
@@ -84,10 +76,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
protected Optional<DataStreamTableInfo> fromSourceTable(
Config pluginConfig, List<DataStreamTableInfo>
upstreamDataStreams) {
if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
- StreamTableEnvironment tableEnvironment =
- flinkRuntimeEnvironment.getStreamTableEnvironment();
String tableName = pluginConfig.getString(SOURCE_TABLE_NAME);
- Table table = tableEnvironment.from(tableName);
DataStreamTableInfo dataStreamTableInfo =
upstreamDataStreams.stream()
.filter(info ->
tableName.equals(info.getTableName()))
@@ -99,20 +88,13 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
"table %s not
found", tableName)));
return Optional.of(
new DataStreamTableInfo(
- TableUtil.tableToDataStream(tableEnvironment,
table),
- dataStreamTableInfo.getCatalogTable(),
+ dataStreamTableInfo.getDataStream(),
+ dataStreamTableInfo.getCatalogTables(),
tableName));
}
return Optional.empty();
}
- protected void registerResultTable(Config pluginConfig, DataStream<Row>
dataStream) {
- if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
- String resultTable =
pluginConfig.getString(RESULT_TABLE_NAME.key());
- flinkRuntimeEnvironment.registerResultTable(pluginConfig,
dataStream, resultTable);
- }
- }
-
protected abstract List<T> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs);
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index c1de8ff4f7..e3428c751e 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -21,13 +21,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
-import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import lombok.extern.slf4j.Slf4j;
@@ -50,7 +43,6 @@ public class FlinkRuntimeEnvironment extends
AbstractFlinkRuntimeEnvironment
@Override
public FlinkRuntimeEnvironment prepare() {
createStreamEnvironment();
- createStreamTableEnvironment();
if (config.hasPath("job.name")) {
jobName = config.getString("job.name");
}
@@ -63,23 +55,6 @@ public class FlinkRuntimeEnvironment extends
AbstractFlinkRuntimeEnvironment
return this;
}
- private void createStreamTableEnvironment() {
- EnvironmentSettings environmentSettings =
- EnvironmentSettings.newInstance().inStreamingMode().build();
- tableEnvironment =
- StreamTableEnvironment.create(getStreamExecutionEnvironment(),
environmentSettings);
- TableConfig config = tableEnvironment.getConfig();
- if (EnvironmentUtil.hasPathAndWaring(this.config,
ConfigKeyName.MAX_STATE_RETENTION_TIME)
- && EnvironmentUtil.hasPathAndWaring(
- this.config, ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
- long max =
this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
- long min =
this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
- config.setIdleStateRetentionTime(Time.seconds(min),
Time.seconds(max));
- }
- // init flink table env config
- EnvironmentUtil.initTableEnvironmentConfiguration(this.config,
config.getConfiguration());
- }
-
public static FlinkRuntimeEnvironment getInstance(Config config) {
if (INSTANCE == null) {
synchronized (FlinkRuntimeEnvironment.class) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 1424746455..c713593821 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -52,6 +52,7 @@ import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+@SuppressWarnings("unchecked,rawtypes")
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends
Factory>> {
@@ -108,12 +109,15 @@ public class SinkExecuteProcessor
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
sink.setJobContext(jobContext);
- SeaTunnelRowType sourceType =
stream.getCatalogTable().getSeaTunnelRowType();
+ // TODO sink support multi table
+ SeaTunnelRowType sourceType =
+ stream.getCatalogTables().get(0).getSeaTunnelRowType();
sink.setTypeInfo(sourceType);
} else {
+ // TODO sink support multi table
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
- stream.getCatalogTable(),
+ stream.getCatalogTables().get(0),
ReadonlyConfig.fromConfig(sinkConfig),
classLoader,
((TableSinkFactory) factory.get())
@@ -137,8 +141,9 @@ public class SinkExecuteProcessor
stream.getDataStream()
.sinkTo(
SinkV1Adapter.wrap(
- new FlinkSink<>(sink,
stream.getCatalogTable())))
- .name(sink.getPluginName());
+ new FlinkSink<>(
+ sink,
stream.getCatalogTables().get(0))))
+ .name(String.format("%s-Sink",
sink.getPluginName()));
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 20b74f4b71..eeb757a853 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
@@ -71,21 +72,20 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
Config pluginConfig = pluginConfigs.get(i);
FlinkSource flinkSource = new FlinkSource<>(internalSource,
envConfig);
- DataStreamSource sourceStream =
+ DataStreamSource<SeaTunnelRow> sourceStream =
executionEnvironment.fromSource(
flinkSource,
WatermarkStrategy.noWatermarks(),
- String.format("%s-source",
internalSource.getPluginName()));
+ String.format("%s-Source",
internalSource.getPluginName()));
if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism =
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
sourceStream.setParallelism(parallelism);
}
- registerResultTable(pluginConfig, sourceStream);
sources.add(
new DataStreamTableInfo(
sourceStream,
- sourceTableInfo.getCatalogTables().get(0),
+ sourceTableInfo.getCatalogTables(),
pluginConfig.hasPath(RESULT_TABLE_NAME.key())
?
pluginConfig.getString(RESULT_TABLE_NAME.key())
: null));
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 1ff2cf6437..c92eaf42a9 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -25,19 +25,15 @@ import
org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
-import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
+import org.apache.flink.streaming.api.operators.StreamMap;
import java.net.URL;
import java.util.Collections;
@@ -47,6 +43,7 @@ import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
+@SuppressWarnings("unchecked,rawtypes")
public class TransformExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<TableTransformFactory> {
@@ -97,21 +94,20 @@ public class TransformExecuteProcessor
TableTransformFactory factory = plugins.get(i);
TableTransformFactoryContext context =
new TableTransformFactoryContext(
-
Collections.singletonList(stream.getCatalogTable()),
+ stream.getCatalogTables(),
ReadonlyConfig.fromConfig(pluginConfig),
classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
SeaTunnelTransform transform =
factory.createTransform(context).createTransform();
- SeaTunnelRowType sourceType =
stream.getCatalogTable().getSeaTunnelRowType();
transform.setJobContext(jobContext);
- DataStream<Row> inputStream =
- flinkTransform(sourceType, transform,
stream.getDataStream());
- registerResultTable(pluginConfig, inputStream);
+ DataStream<SeaTunnelRow> inputStream =
+ flinkTransform(transform, stream.getDataStream());
+ // TODO transform support multi tables
upstreamDataStreams.add(
new DataStreamTableInfo(
inputStream,
- transform.getProducedCatalogTable(),
+
Collections.singletonList(transform.getProducedCatalogTable()),
pluginConfig.hasPath(RESULT_TABLE_NAME.key())
?
pluginConfig.getString(RESULT_TABLE_NAME.key())
: null));
@@ -126,28 +122,17 @@ public class TransformExecuteProcessor
return upstreamDataStreams;
}
- protected DataStream<Row> flinkTransform(
- SeaTunnelRowType sourceType, SeaTunnelTransform transform,
DataStream<Row> stream) {
- TypeInformation rowTypeInfo =
- TypeConverterUtils.convert(
-
transform.getProducedCatalogTable().getSeaTunnelRowType());
- FlinkRowConverter transformInputRowConverter = new
FlinkRowConverter(sourceType);
- FlinkRowConverter transformOutputRowConverter =
- new
FlinkRowConverter(transform.getProducedCatalogTable().getSeaTunnelRowType());
- DataStream<Row> output =
- stream.flatMap(
- (FlatMapFunction<Row, Row>)
- (value, out) -> {
- SeaTunnelRow seaTunnelRow =
-
transformInputRowConverter.reconvert(value);
- SeaTunnelRow dataRow =
- (SeaTunnelRow)
transform.map(seaTunnelRow);
- if (dataRow != null) {
- Row copy =
transformOutputRowConverter.convert(dataRow);
- out.collect(copy);
- }
- },
- rowTypeInfo);
- return output;
+ protected DataStream<SeaTunnelRow> flinkTransform(
+ SeaTunnelTransform transform, DataStream<SeaTunnelRow> stream) {
+ return stream.transform(
+ String.format("%s-Transform", transform.getPluginName()),
+ TypeInformation.of(SeaTunnelRow.class),
+ new StreamMap<>(
+ flinkRuntimeEnvironment
+ .getStreamExecutionEnvironment()
+ .clean(
+ row ->
+
((SeaTunnelTransform<SeaTunnelRow>) transform)
+ .map(row))));
}
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java
index ad5c6365f2..48485eb69c 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java
@@ -126,22 +126,6 @@ public class UnifyEnvParameterIT extends TestSuiteBase {
}
Assertions.assertNotNull(jobInfoReference.get());
});
- Map<String, Object> jobInfo = jobInfoReference.get();
-
- /**
- * 'table.exec.resource.default-parallelism' has a higher priority
than 'parallelism', so
- * one of these nodes must have a parallelism of 2.
- */
- Map<String, Object> plan = (Map<String, Object>) jobInfo.get("plan");
- List<Map<String, Object>> nodes = (List<Map<String, Object>>)
plan.get("nodes");
- boolean tableExecParallelism = false;
- for (Map<String, Object> node : nodes) {
- int parallelism = (int) node.get("parallelism");
- if (!tableExecParallelism && parallelism == 2) {
- tableExecParallelism = true;
- }
- }
- Assertions.assertTrue(tableExecParallelism);
}
public void genericTest(String configPath, AbstractTestFlinkContainer
container)
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 99c75d324a..ef801bdb9c 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -112,6 +112,12 @@
<version>${flink.1.15.3.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime-web</artifactId>
+ <version>${flink.1.15.3.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
diff --git
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index 799dac7996..12c1f9f281 100644
---
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -19,7 +19,7 @@
######
env {
- job.mode = "BATCH"
+ job.mode = "STREAMING"
parallelism = 2
}
@@ -41,15 +41,21 @@ source {
}
transform {
-
+ Copy {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ fields {
+ name1 = name
+ }
+ }
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform-v2
}
sink {
Console {
+ source_table_name = "fake1"
}
-
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index d4bd43c3d1..4b62895f18 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -66,6 +66,7 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@Slf4j
+@SuppressWarnings("unchecked")
public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T>
{
private static final String PLUGIN_MAPPING_FILE =
"plugin-mapping.properties";
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
deleted file mode 100644
index 95cfa335e7..0000000000
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.utils;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TypeConverterUtilsTest {
- // --------------------------------------------------------------
- // basic types test
- // --------------------------------------------------------------
-
- @Test
- public void convertStringType() {
- Assertions.assertEquals(
- BasicTypeInfo.STRING_TYPE_INFO,
TypeConverterUtils.convert(BasicType.STRING_TYPE));
- }
-
- @Test
- public void convertIntegerType() {
- Assertions.assertEquals(
- BasicTypeInfo.INT_TYPE_INFO,
TypeConverterUtils.convert(BasicType.INT_TYPE));
- }
-
- @Test
- public void convertBooleanType() {
- Assertions.assertEquals(
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- TypeConverterUtils.convert(BasicType.BOOLEAN_TYPE));
- }
-
- @Test
- public void convertDoubleType() {
- Assertions.assertEquals(
- BasicTypeInfo.DOUBLE_TYPE_INFO,
TypeConverterUtils.convert(BasicType.DOUBLE_TYPE));
- }
-
- @Test
- public void convertLongType() {
- Assertions.assertEquals(
- BasicTypeInfo.LONG_TYPE_INFO,
TypeConverterUtils.convert(BasicType.LONG_TYPE));
- }
-
- @Test
- public void convertFloatType() {
- Assertions.assertEquals(
- BasicTypeInfo.FLOAT_TYPE_INFO,
TypeConverterUtils.convert(BasicType.FLOAT_TYPE));
- }
-
- @Test
- public void convertByteType() {
- Assertions.assertEquals(
- BasicTypeInfo.BYTE_TYPE_INFO,
TypeConverterUtils.convert(BasicType.BYTE_TYPE));
- }
-
- @Test
- public void convertShortType() {
- Assertions.assertEquals(
- BasicTypeInfo.SHORT_TYPE_INFO,
TypeConverterUtils.convert(BasicType.SHORT_TYPE));
- }
-
- @Test
- public void convertBigDecimalType() {
- /**
- * To solve lost precision and scale of {@link
- * org.apache.seatunnel.api.table.type.DecimalType}, use {@link
- *
org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the
convert
- * result of {@link org.apache.seatunnel.api.table.type.DecimalType}
instance.
- */
- Assertions.assertEquals(
- BasicTypeInfo.STRING_TYPE_INFO, TypeConverterUtils.convert(new
DecimalType(30, 2)));
- }
-
- @Test
- public void convertNullType() {
- Assertions.assertEquals(
- BasicTypeInfo.VOID_TYPE_INFO,
TypeConverterUtils.convert(BasicType.VOID_TYPE));
- }
-
- // --------------------------------------------------------------
- // array types test
- // --------------------------------------------------------------
-
- @Test
- public void convertBooleanArrayType() {
- Assertions.assertEquals(
- BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO,
- TypeConverterUtils.convert(ArrayType.BOOLEAN_ARRAY_TYPE));
- }
-
- @Test
- public void convertStringArrayType() {
- Assertions.assertEquals(
- BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
- TypeConverterUtils.convert(ArrayType.STRING_ARRAY_TYPE));
- }
-
- @Test
- public void convertDoubleArrayType() {
- Assertions.assertEquals(
- BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO,
- TypeConverterUtils.convert(ArrayType.DOUBLE_ARRAY_TYPE));
- }
-
- @Test
- public void convertIntegerArrayType() {
- Assertions.assertEquals(
- BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO,
- TypeConverterUtils.convert(ArrayType.INT_ARRAY_TYPE));
- }
-
- @Test
- public void convertLongArrayType() {
- Assertions.assertEquals(
- BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO,
- TypeConverterUtils.convert(ArrayType.LONG_ARRAY_TYPE));
- }
-
- @Test
- public void convertFloatArrayType() {
- Assertions.assertEquals(
- BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO,
- TypeConverterUtils.convert(ArrayType.FLOAT_ARRAY_TYPE));
- }
-
- @Test
- public void convertByteArrayType() {
- Assertions.assertEquals(
- BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO,
- TypeConverterUtils.convert(ArrayType.BYTE_ARRAY_TYPE));
- }
-
- @Test
- public void convertShortArrayType() {
- Assertions.assertEquals(
- BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO,
- TypeConverterUtils.convert(ArrayType.SHORT_ARRAY_TYPE));
- }
-}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
deleted file mode 100644
index b24cb96dfe..0000000000
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.table.type.SqlType;
-import org.apache.seatunnel.translation.serialization.RowConverter;
-
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BiFunction;
-
-/**
- * The row converter between {@link Row} and {@link SeaTunnelRow}, used to
convert or reconvert
- * between flink row and seatunnel row
- */
-@Slf4j
-public class FlinkRowConverter extends RowConverter<Row> {
-
- public FlinkRowConverter(SeaTunnelDataType<?> dataType) {
- super(dataType);
- }
-
- @Override
- public Row convert(SeaTunnelRow seaTunnelRow) throws IOException {
- validate(seaTunnelRow);
- return (Row) convert(seaTunnelRow, dataType);
- }
-
- private static Object convert(Object field, SeaTunnelDataType<?> dataType)
{
- if (field == null) {
- return null;
- }
- SqlType sqlType = dataType.getSqlType();
- switch (sqlType) {
- case ROW:
- SeaTunnelRow seaTunnelRow = (SeaTunnelRow) field;
- SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
- int arity = rowType.getTotalFields();
- Row engineRow = new Row(arity);
- for (int i = 0; i < arity; i++) {
- engineRow.setField(
- i, convert(seaTunnelRow.getField(i),
rowType.getFieldType(i)));
- }
-
engineRow.setKind(RowKind.fromByteValue(seaTunnelRow.getRowKind().toByteValue()));
- return engineRow;
- case MAP:
- return convertMap(
- (Map<?, ?>) field, (MapType<?, ?>) dataType,
FlinkRowConverter::convert);
-
- /**
- * To solve lost precision and scale of {@link
- * org.apache.seatunnel.api.table.type.DecimalType}, use
{@link java.lang.String} as
- * the convert result of {@link java.math.BigDecimal} instance.
- */
- case DECIMAL:
- BigDecimal decimal = (BigDecimal) field;
- return decimal.toString();
- default:
- return field;
- }
- }
-
- private static Object convertMap(
- Map<?, ?> mapData,
- MapType<?, ?> mapType,
- BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
- if (mapData == null || mapData.isEmpty()) {
- return mapData;
- }
-
- Map<Object, Object> newMap = new HashMap<>(mapData.size());
- mapData.forEach(
- (key, value) -> {
- SeaTunnelDataType<?> keyType = mapType.getKeyType();
- SeaTunnelDataType<?> valueType = mapType.getValueType();
- newMap.put(
- convertFunction.apply(key, keyType),
- convertFunction.apply(value, valueType));
- });
- return newMap;
- }
-
- @Override
- public SeaTunnelRow reconvert(Row engineRow) throws IOException {
- return (SeaTunnelRow) reconvert(engineRow, dataType);
- }
-
- private static Object reconvert(Object field, SeaTunnelDataType<?>
dataType) {
- if (field == null) {
- return null;
- }
- SqlType sqlType = dataType.getSqlType();
- switch (sqlType) {
- case ROW:
- Row engineRow = (Row) field;
- SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
- int arity = rowType.getTotalFields();
- SeaTunnelRow seaTunnelRow = new SeaTunnelRow(arity);
- for (int i = 0; i < arity; i++) {
- seaTunnelRow.setField(
- i, reconvert(engineRow.getField(i),
rowType.getFieldType(i)));
- }
- seaTunnelRow.setRowKind(
-
org.apache.seatunnel.api.table.type.RowKind.fromByteValue(
- engineRow.getKind().toByteValue()));
- return seaTunnelRow;
- case MAP:
- return convertMap(
- (Map<?, ?>) field, (MapType<?, ?>) dataType,
FlinkRowConverter::reconvert);
-
- /**
- * To solve lost precision and scale of {@link
- * org.apache.seatunnel.api.table.type.DecimalType}, create
{@link
- * java.math.BigDecimal} instance from {@link
java.lang.String} type field.
- */
- case DECIMAL:
- DecimalType decimalType = (DecimalType) dataType;
- String decimalData = (String) field;
- BigDecimal decimal = new BigDecimal(decimalData);
- decimal.setScale(decimalType.getScale(), RoundingMode.HALF_UP);
- return decimal;
- default:
- return field;
- }
- }
-}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 3c949f6480..725bf606f9 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -25,11 +25,9 @@ import
org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SupportResourceShare;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.types.Row;
import lombok.extern.slf4j.Slf4j;
@@ -54,7 +52,6 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow,
CommT, WriterStateT>
sinkWriter;
- private final FlinkRowConverter rowSerialization;
private final Counter sinkWriteCount;
@@ -73,7 +70,6 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
MetricsContext metricsContext) {
this.sinkWriter = sinkWriter;
this.checkpointId = checkpointId;
- this.rowSerialization = new FlinkRowConverter(dataType);
this.sinkWriteCount =
metricsContext.counter(MetricNames.SINK_WRITE_COUNT);
this.sinkWriteBytes =
metricsContext.counter(MetricNames.SINK_WRITE_BYTES);
this.sinkWriterQPS = metricsContext.meter(MetricNames.SINK_WRITE_QPS);
@@ -86,15 +82,17 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
@Override
public void write(InputT element, SinkWriter.Context context) throws
IOException {
- if (element instanceof Row) {
- SeaTunnelRow seaTunnelRow = rowSerialization.reconvert((Row)
element);
- sinkWriter.write(seaTunnelRow);
+ if (element == null) {
+ return;
+ }
+ if (element instanceof SeaTunnelRow) {
+ sinkWriter.write((SeaTunnelRow) element);
sinkWriteCount.inc();
- sinkWriteBytes.inc(seaTunnelRow.getBytesSize());
+ sinkWriteBytes.inc(((SeaTunnelRow) element).getBytesSize());
sinkWriterQPS.markEvent();
} else {
throw new InvalidClassException(
- "only support Flink Row at now, the element Class is " +
element.getClass());
+ "only support SeaTunnelRow at now, the element Class is "
+ element.getClass());
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
index 39b14d17d0..2ea584029e 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
@@ -25,26 +25,18 @@ import org.apache.seatunnel.api.common.metrics.MetricNames;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
import org.apache.flink.api.connector.source.ReaderOutput;
-import org.apache.flink.types.Row;
import lombok.extern.slf4j.Slf4j;
-/**
- * The implementation of {@link Collector} for flink engine, as a container
for {@link SeaTunnelRow}
- * and convert {@link SeaTunnelRow} to {@link Row}.
- */
+/** The implementation of {@link Collector} for flink engine. */
@Slf4j
public class FlinkRowCollector implements Collector<SeaTunnelRow> {
- private ReaderOutput<Row> readerOutput;
-
- private final FlinkRowConverter rowSerialization;
+ private ReaderOutput<SeaTunnelRow> readerOutput;
private final FlowControlGate flowControlGate;
@@ -54,9 +46,7 @@ public class FlinkRowCollector implements
Collector<SeaTunnelRow> {
private final Meter sourceReadQPS;
- public FlinkRowCollector(
- SeaTunnelRowType seaTunnelRowType, Config envConfig,
MetricsContext metricsContext) {
- this.rowSerialization = new FlinkRowConverter(seaTunnelRowType);
+ public FlinkRowCollector(Config envConfig, MetricsContext metricsContext) {
this.flowControlGate =
FlowControlGate.create(FlowControlStrategy.fromConfig(envConfig));
this.sourceReadCount =
metricsContext.counter(MetricNames.SOURCE_RECEIVED_COUNT);
this.sourceReadBytes =
metricsContext.counter(MetricNames.SOURCE_RECEIVED_BYTES);
@@ -67,8 +57,7 @@ public class FlinkRowCollector implements
Collector<SeaTunnelRow> {
public void collect(SeaTunnelRow record) {
flowControlGate.audit(record);
try {
- Row row = rowSerialization.convert(record);
- readerOutput.collect(row);
+ readerOutput.collect(record);
sourceReadCount.inc();
sourceReadBytes.inc(record.getBytesSize());
sourceReadQPS.markEvent();
@@ -82,7 +71,7 @@ public class FlinkRowCollector implements
Collector<SeaTunnelRow> {
return this;
}
- public FlinkRowCollector withReaderOutput(ReaderOutput<Row> readerOutput) {
+ public FlinkRowCollector withReaderOutput(ReaderOutput<SeaTunnelRow>
readerOutput) {
this.readerOutput = readerOutput;
return this;
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
index adf54eef4c..7868e6d3ef 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
@@ -24,9 +24,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
@@ -37,7 +35,6 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.types.Row;
import java.io.Serializable;
@@ -48,7 +45,8 @@ import java.io.Serializable;
* @param <EnumStateT> The generic type of enumerator state
*/
public class FlinkSource<SplitT extends SourceSplit, EnumStateT extends
Serializable>
- implements Source<Row, SplitWrapper<SplitT>, EnumStateT>,
ResultTypeQueryable<Row> {
+ implements Source<SeaTunnelRow, SplitWrapper<SplitT>, EnumStateT>,
+ ResultTypeQueryable<SeaTunnelRow> {
private final SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source;
@@ -68,14 +66,13 @@ public class FlinkSource<SplitT extends SourceSplit,
EnumStateT extends Serializ
}
@Override
- public SourceReader<Row, SplitWrapper<SplitT>>
createReader(SourceReaderContext readerContext)
- throws Exception {
+ public SourceReader<SeaTunnelRow, SplitWrapper<SplitT>> createReader(
+ SourceReaderContext readerContext) throws Exception {
org.apache.seatunnel.api.source.SourceReader.Context context =
new FlinkSourceReaderContext(readerContext, source);
org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT>
reader =
source.createReader(context);
- return new FlinkSourceReader<>(
- reader, context, envConfig, (SeaTunnelRowType)
source.getProducedType());
+ return new FlinkSourceReader<>(reader, context, envConfig);
}
@Override
@@ -110,7 +107,7 @@ public class FlinkSource<SplitT extends SourceSplit,
EnumStateT extends Serializ
}
@Override
- public TypeInformation<Row> getProducedType() {
- return (TypeInformation<Row>)
TypeConverterUtils.convert(source.getProducedType());
+ public TypeInformation<SeaTunnelRow> getProducedType() {
+ return TypeInformation.of(SeaTunnelRow.class);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
index 65dc432477..c2f9cde500 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
@@ -21,13 +21,11 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.core.io.InputStatus;
-import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +41,7 @@ import java.util.stream.Collectors;
* @param <SplitT>
*/
public class FlinkSourceReader<SplitT extends SourceSplit>
- implements SourceReader<Row, SplitWrapper<SplitT>> {
+ implements SourceReader<SeaTunnelRow, SplitWrapper<SplitT>> {
private final Logger LOGGER =
LoggerFactory.getLogger(FlinkSourceReader.class);
@@ -58,12 +56,10 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
public FlinkSourceReader(
org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT>
sourceReader,
org.apache.seatunnel.api.source.SourceReader.Context context,
- Config envConfig,
- SeaTunnelRowType seaTunnelRowType) {
+ Config envConfig) {
this.sourceReader = sourceReader;
this.context = context;
- this.flinkRowCollector =
- new FlinkRowCollector(seaTunnelRowType, envConfig,
context.getMetricsContext());
+ this.flinkRowCollector = new FlinkRowCollector(envConfig,
context.getMetricsContext());
}
@Override
@@ -76,7 +72,7 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
}
@Override
- public InputStatus pollNext(ReaderOutput<Row> output) throws Exception {
+ public InputStatus pollNext(ReaderOutput<SeaTunnelRow> output) throws
Exception {
if (!((FlinkSourceReaderContext) context).isSendNoMoreElementEvent()) {
sourceReader.pollNext(flinkRowCollector.withReaderOutput(output));
} else {
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
deleted file mode 100644
index ebb77da268..0000000000
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.utils;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TypeConverterUtils {
-
- private static final Map<Class<?>, BridgedType> BRIDGED_TYPES = new
HashMap<>(32);
-
- static {
- // basic types
- BRIDGED_TYPES.put(
- String.class,
- BridgedType.of(BasicType.STRING_TYPE,
BasicTypeInfo.STRING_TYPE_INFO));
- BRIDGED_TYPES.put(
- Boolean.class,
- BridgedType.of(BasicType.BOOLEAN_TYPE,
BasicTypeInfo.BOOLEAN_TYPE_INFO));
- BRIDGED_TYPES.put(
- Byte.class, BridgedType.of(BasicType.BYTE_TYPE,
BasicTypeInfo.BYTE_TYPE_INFO));
- BRIDGED_TYPES.put(
- Short.class, BridgedType.of(BasicType.SHORT_TYPE,
BasicTypeInfo.SHORT_TYPE_INFO));
- BRIDGED_TYPES.put(
- Integer.class, BridgedType.of(BasicType.INT_TYPE,
BasicTypeInfo.INT_TYPE_INFO));
- BRIDGED_TYPES.put(
- Long.class, BridgedType.of(BasicType.LONG_TYPE,
BasicTypeInfo.LONG_TYPE_INFO));
- BRIDGED_TYPES.put(
- Float.class, BridgedType.of(BasicType.FLOAT_TYPE,
BasicTypeInfo.FLOAT_TYPE_INFO));
- BRIDGED_TYPES.put(
- Double.class,
- BridgedType.of(BasicType.DOUBLE_TYPE,
BasicTypeInfo.DOUBLE_TYPE_INFO));
- BRIDGED_TYPES.put(
- Void.class, BridgedType.of(BasicType.VOID_TYPE,
BasicTypeInfo.VOID_TYPE_INFO));
- /**
- * To solve lost precision and scale of {@link
- * org.apache.seatunnel.api.table.type.DecimalType}, use {@link
- *
org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the
payload of
- * {@link org.apache.seatunnel.api.table.type.DecimalType}.
- */
- BRIDGED_TYPES.put(
- BigDecimal.class,
- BridgedType.of(new DecimalType(38, 18),
BasicTypeInfo.STRING_TYPE_INFO));
-
- // data time types
- BRIDGED_TYPES.put(
- LocalDate.class,
- BridgedType.of(LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeTypeInfo.LOCAL_DATE));
- BRIDGED_TYPES.put(
- LocalTime.class,
- BridgedType.of(LocalTimeType.LOCAL_TIME_TYPE,
LocalTimeTypeInfo.LOCAL_TIME));
- BRIDGED_TYPES.put(
- LocalDateTime.class,
- BridgedType.of(
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
LocalTimeTypeInfo.LOCAL_DATE_TIME));
- // basic array types
- BRIDGED_TYPES.put(
- byte[].class,
- BridgedType.of(
- PrimitiveByteArrayType.INSTANCE,
-
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- String[].class,
- BridgedType.of(
- ArrayType.STRING_ARRAY_TYPE,
BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Boolean[].class,
- BridgedType.of(
- ArrayType.BOOLEAN_ARRAY_TYPE,
BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Byte[].class,
- BridgedType.of(ArrayType.BYTE_ARRAY_TYPE,
BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Short[].class,
- BridgedType.of(
- ArrayType.SHORT_ARRAY_TYPE,
BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Integer[].class,
- BridgedType.of(ArrayType.INT_ARRAY_TYPE,
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Long[].class,
- BridgedType.of(ArrayType.LONG_ARRAY_TYPE,
BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Float[].class,
- BridgedType.of(
- ArrayType.FLOAT_ARRAY_TYPE,
BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Double[].class,
- BridgedType.of(
- ArrayType.DOUBLE_ARRAY_TYPE,
BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO));
- }
-
- private TypeConverterUtils() {
- throw new UnsupportedOperationException(
- "TypeConverterUtils is a utility class and cannot be
instantiated");
- }
-
- public static SeaTunnelDataType<?> convert(TypeInformation<?> dataType) {
- BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
- if (bridgedType != null) {
- return bridgedType.getSeaTunnelType();
- }
-
- if (dataType instanceof MapTypeInfo) {
- MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) dataType;
- return new MapType<>(
- convert(mapTypeInfo.getKeyTypeInfo()),
convert(mapTypeInfo.getValueTypeInfo()));
- }
- if (dataType instanceof RowTypeInfo) {
- RowTypeInfo typeInformation = (RowTypeInfo) dataType;
- String[] fieldNames = typeInformation.getFieldNames();
- SeaTunnelDataType<?>[] seaTunnelDataTypes =
- Arrays.stream(typeInformation.getFieldTypes())
- .map(TypeConverterUtils::convert)
- .toArray(SeaTunnelDataType[]::new);
- return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
- }
- throw new IllegalArgumentException("Unsupported Flink's data type: " +
dataType);
- }
-
- public static TypeInformation<?> convert(SeaTunnelDataType<?> dataType) {
- BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
- if (bridgedType != null) {
- return bridgedType.getFlinkType();
- }
-
- if (dataType instanceof MapType) {
- MapType<?, ?> mapType = (MapType<?, ?>) dataType;
- return new MapTypeInfo<>(
- convert(mapType.getKeyType()),
convert(mapType.getValueType()));
- }
-
- if (dataType instanceof ArrayType) {
- ArrayType arrayType = (ArrayType) dataType;
- return ObjectArrayTypeInfo.getInfoFor(
- arrayType.getTypeClass(),
convert(arrayType.getElementType()));
- }
-
- if (dataType instanceof SeaTunnelRowType) {
- SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
- TypeInformation<?>[] types =
- Arrays.stream(rowType.getFieldTypes())
- .map(TypeConverterUtils::convert)
- .toArray(TypeInformation[]::new);
- return new RowTypeInfo(types, rowType.getFieldNames());
- }
- throw new IllegalArgumentException("Unsupported SeaTunnel's data type:
" + dataType);
- }
-
- public static class BridgedType {
- private final SeaTunnelDataType<?> seaTunnelType;
- private final TypeInformation<?> flinkType;
-
- private BridgedType(SeaTunnelDataType<?> seaTunnelType,
TypeInformation<?> flinkType) {
- this.seaTunnelType = seaTunnelType;
- this.flinkType = flinkType;
- }
-
- public static BridgedType of(
- SeaTunnelDataType<?> seaTunnelType, TypeInformation<?>
flinkType) {
- return new BridgedType(seaTunnelType, flinkType);
- }
-
- public TypeInformation<?> getFlinkType() {
- return flinkType;
- }
-
- public SeaTunnelDataType<?> getSeaTunnelType() {
- return seaTunnelType;
- }
- }
-}