This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d46cf16e5a [Feature] Split transform and move jar into connectors
directory (#7218)
d46cf16e5a is described below
commit d46cf16e5a4bb0e4820feaf4dd03e7e9038d6281
Author: Jarvis <[email protected]>
AuthorDate: Sat Aug 10 23:47:12 2024 +0800
[Feature] Split transform and move jar into connectors directory (#7218)
---
plugin-mapping.properties | 13 +++++-
.../core/starter/execution/PluginUtil.java | 16 +++----
.../flink/execution/TransformExecuteProcessor.java | 14 +++++-
.../spark/execution/TransformExecuteProcessor.java | 11 ++++-
.../src/main/assembly/assembly-bin-ci.xml | 48 ++++++++------------
seatunnel-dist/src/main/assembly/assembly-bin.xml | 6 +--
.../seatunnel/e2e/common/util/ContainerUtil.java | 4 +-
.../core/parse/MultipleTableJobConfigParser.java | 36 ++++++++++-----
.../SeaTunnelTransformPluginDiscovery.java | 2 +-
.../common/AbstractCatalogSupportTransform.java | 16 ++++++-
.../common/AbstractSeaTunnelTransform.java | 51 ----------------------
.../seatunnel/transform/sql/SQLTransform.java | 15 ++-----
12 files changed, 108 insertions(+), 124 deletions(-)
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 1942f875d7..579bf2dac0 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -129,4 +129,15 @@ seatunnel.source.ObsFile = connector-file-obs
seatunnel.sink.ObsFile = connector-file-obs
seatunnel.source.Milvus = connector-milvus
seatunnel.sink.Milvus = connector-milvus
-seatunnel.sink.ActiveMQ = connector-activemq
\ No newline at end of file
+seatunnel.sink.ActiveMQ = connector-activemq
+
+seatunnel.transform.Sql = seatunnel-transforms-v2
+seatunnel.transform.FieldMapper = seatunnel-transforms-v2
+seatunnel.transform.Filter = seatunnel-transforms-v2
+seatunnel.transform.FilterRowKind = seatunnel-transforms-v2
+seatunnel.transform.JsonPath = seatunnel-transforms-v2
+seatunnel.transform.Replace = seatunnel-transforms-v2
+seatunnel.transform.Split = seatunnel-transforms-v2
+seatunnel.transform.Copy = seatunnel-transforms-v2
+seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
+seatunnel.transform.LLM = seatunnel-transforms-v2
\ No newline at end of file
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index 0dc4209a8b..166e581e2d 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -31,7 +31,6 @@ import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryException;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.SeaTunnelException;
@@ -49,7 +48,6 @@ import java.util.Optional;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
-import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
/** The util used for Spark/Flink to create to SeaTunnelSource etc. */
public class PluginUtil {
@@ -130,21 +128,21 @@ public class PluginUtil {
return source;
}
- public static TableTransformFactory createTransformFactory(
+ public static Optional<? extends Factory> createTransformFactory(
+ SeaTunnelFactoryDiscovery factoryDiscovery,
SeaTunnelTransformPluginDiscovery transformPluginDiscovery,
Config transformConfig,
List<URL> pluginJars) {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(
ENGINE_TYPE, "transform",
transformConfig.getString(PLUGIN_NAME.key()));
- final ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(transformConfig);
- final String factoryId = readonlyConfig.get(PLUGIN_NAME);
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
- final TableTransformFactory factory =
- discoverFactory(classLoader, TableTransformFactory.class,
factoryId);
pluginJars.addAll(
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- return factory;
+ try {
+ return
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
+ } catch (FactoryException e) {
+ return Optional.empty();
+ }
}
public static Optional<? extends Factory> createSinkFactory(
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index d91bb9d3da..1ff2cf6437 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
@@ -41,6 +42,7 @@ import org.apache.flink.types.Row;
import java.net.URL;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
@@ -59,15 +61,23 @@ public class TransformExecuteProcessor
@Override
protected List<TableTransformFactory> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {
+
+ SeaTunnelFactoryDiscovery factoryDiscovery =
+ new SeaTunnelFactoryDiscovery(TableTransformFactory.class,
ADD_URL_TO_CLASSLOADER);
SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
new SeaTunnelTransformPluginDiscovery();
-
return pluginConfigs.stream()
.map(
transformConfig ->
PluginUtil.createTransformFactory(
- transformPluginDiscovery,
transformConfig, jarPaths))
+ factoryDiscovery,
+ transformPluginDiscovery,
+ transformConfig,
+ jarPaths))
.distinct()
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .map(e -> (TableTransformFactory) e)
.collect(Collectors.toList());
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index bc7cd5cdbe..fc4a9e00d0 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import
org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
@@ -50,6 +51,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
@@ -69,16 +71,23 @@ public class TransformExecuteProcessor
protected List<TableTransformFactory> initializePlugins(List<? extends
Config> pluginConfigs) {
SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
new SeaTunnelTransformPluginDiscovery();
+
+ SeaTunnelFactoryDiscovery factoryDiscovery =
+ new SeaTunnelFactoryDiscovery(TableTransformFactory.class);
List<URL> pluginJars = new ArrayList<>();
List<TableTransformFactory> transforms =
pluginConfigs.stream()
.map(
transformConfig ->
PluginUtil.createTransformFactory(
+ factoryDiscovery,
transformPluginDiscovery,
transformConfig,
- pluginJars))
+ new ArrayList<>()))
.distinct()
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .map(e -> (TableTransformFactory) e)
.collect(Collectors.toList());
sparkRuntimeEnvironment.registerPlugin(pluginJars);
return transforms;
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index cc48ac86a2..4510579d81 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -140,7 +140,7 @@
<scope>provided</scope>
</dependencySet>
- <!-- ============ Connectors Jars ============ -->
+ <!-- ============ Connectors Jars And Transforms V2 Jar ============
-->
<!-- SeaTunnel connectors -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
@@ -148,6 +148,7 @@
<unpack>false</unpack>
<includes>
<include>org.apache.seatunnel:connector-*:jar</include>
+
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
</includes>
<excludes>
<exclude>org.apache.seatunnel:connector-common</exclude>
@@ -160,36 +161,7 @@
<scope>provided</scope>
</dependencySet>
- <!-- ============ SeaTunnel Transforms V2 Jars And SeaTunnel Hadoop3
Uber Jar ============ -->
- <dependencySet>
- <useProjectArtifact>false</useProjectArtifact>
- <useTransitiveDependencies>true</useTransitiveDependencies>
- <unpack>false</unpack>
- <includes>
-
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
- <include>org.apache.hadoop:hadoop-aws:jar</include>
- <include>com.amazonaws:aws-java-sdk-bundle:jar</include>
-
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
- <!--Add hadoop aliyun jar -->
- <include>org.apache.hadoop:hadoop-aliyun:jar</include>
- <include>com.aliyun.oss:aliyun-sdk-oss:jar</include>
- <include>org.jdom:jdom:jar</include>
-
- <!--Add netty buffer jar -->
- <include>io.netty:netty-buffer:jar</include>
- <include>io.netty:netty-common:jar</include>
-
- <!--Add hive exec jar -->
- <include>org.apache.hive:hive-exec:jar</include>
- <include>org.apache.hive:hive-service:jar</include>
- <include>org.apache.thrift:libfb303:jar</include>
- </includes>
-
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
- <outputDirectory>/lib</outputDirectory>
- <scope>provided</scope>
- </dependencySet>
-
- <!-- =================== JDBC Connector Drivers ===================
-->
+ <!-- =================== JDBC Connector Drivers And SeaTunnel Hadoop3
Uber Jar =================== -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
@@ -209,6 +181,20 @@
<include>com.amazon.redshift:redshift-jdbc42:jar</include>
<include>net.snowflake.snowflake-jdbc:jar</include>
<include>com.xugudb:xugu-jdbc:jar</include>
+ <include>org.apache.hadoop:hadoop-aws:jar</include>
+ <include>com.amazonaws:aws-java-sdk-bundle:jar</include>
+
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
+ <!--Add hadoop aliyun jar -->
+ <include>org.apache.hadoop:hadoop-aliyun:jar</include>
+ <include>com.aliyun.oss:aliyun-sdk-oss:jar</include>
+ <include>org.jdom:jdom:jar</include>
+ <!--Add netty buffer jar -->
+ <include>io.netty:netty-buffer:jar</include>
+ <include>io.netty:netty-common:jar</include>
+ <!--Add hive exec jar -->
+ <include>org.apache.hive:hive-exec:jar</include>
+ <include>org.apache.hive:hive-service:jar</include>
+ <include>org.apache.thrift:libfb303:jar</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
<outputDirectory>/lib</outputDirectory>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml
b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index 30fc5a6336..f16841f7a9 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -161,13 +161,12 @@
<scope>provided</scope>
</dependencySet>
- <!-- ============ SeaTunnel Transforms V2 Jars And SeaTunnel Hadoop3
Uber Jar============ -->
+ <!-- ============ SeaTunnel Hadoop3 Uber Jar============ -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>false</unpack>
<includes>
-
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
@@ -175,7 +174,7 @@
<scope>provided</scope>
</dependencySet>
- <!-- ============ Connectors Jars ============ -->
+ <!-- ============ Connectors Jars And Transforms V2 Jar ============
-->
<!-- SeaTunnel connectors for Demo -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
@@ -184,6 +183,7 @@
<includes>
<include>org.apache.seatunnel:connector-fake:jar</include>
<include>org.apache.seatunnel:connector-console:jar</include>
+
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
</includes>
<outputDirectory>/connectors</outputDirectory>
<scope>provided</scope>
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
index 1c590bb69a..6c6a8e5cdd 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
@@ -195,13 +195,13 @@ public final class ContainerUtil {
MountableFile.forHostPath(startJarPath),
Paths.get(seatunnelHomeInContainer, "starter",
startJarName).toString());
- // copy lib
+ // copy transform
String transformJar = "seatunnel-transforms-v2.jar";
Path transformJarPath =
Paths.get(PROJECT_ROOT_PATH, "seatunnel-transforms-v2",
"target", transformJar);
container.withCopyFileToContainer(
MountableFile.forHostPath(transformJarPath),
- Paths.get(seatunnelHomeInContainer, "lib",
transformJar).toString());
+ Paths.get(seatunnelHomeInContainer, "connectors",
transformJar).toString());
// copy bin
final String startBinPath = startModulePath + File.separator +
"src/main/bin/";
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 40a6640c35..d02a76a4c5 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -182,7 +182,7 @@ public class MultipleTableJobConfigParser {
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "sink", Collections.emptyList());
- List<URL> connectorJars = getConnectorJarList(sourceConfigs,
sinkConfigs);
+ List<URL> connectorJars = getConnectorJarList(sourceConfigs,
transformConfigs, sinkConfigs);
if (!commonPluginJars.isEmpty()) {
connectorJars.addAll(commonPluginJars);
}
@@ -238,18 +238,32 @@ public class MultipleTableJobConfigParser {
}
private List<URL> getConnectorJarList(
- List<? extends Config> sourceConfigs, List<? extends Config>
sinkConfigs) {
+ List<? extends Config> sourceConfigs,
+ List<? extends Config> transformConfigs,
+ List<? extends Config> sinkConfigs) {
List<PluginIdentifier> factoryIds =
Stream.concat(
- sourceConfigs.stream()
- .map(ConfigParserUtil::getFactoryId)
- .map(
- factory ->
- PluginIdentifier.of(
-
CollectionConstants
-
.SEATUNNEL_PLUGIN,
-
CollectionConstants.SOURCE_PLUGIN,
- factory)),
+ Stream.concat(
+ sourceConfigs.stream()
+
.map(ConfigParserUtil::getFactoryId)
+ .map(
+ factory ->
+
PluginIdentifier.of(
+
CollectionConstants
+
.SEATUNNEL_PLUGIN,
+
CollectionConstants
+
.SOURCE_PLUGIN,
+
factory)),
+ transformConfigs.stream()
+
.map(ConfigParserUtil::getFactoryId)
+ .map(
+ factory ->
+
PluginIdentifier.of(
+
CollectionConstants
+
.SEATUNNEL_PLUGIN,
+
CollectionConstants
+
.TRANSFORM_PLUGIN,
+
factory))),
sinkConfigs.stream()
.map(ConfigParserUtil::getFactoryId)
.map(
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
index 445bf14628..606cd0d7ca 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
@@ -34,7 +34,7 @@ import java.util.List;
public class SeaTunnelTransformPluginDiscovery extends
AbstractPluginDiscovery<SeaTunnelTransform> {
public SeaTunnelTransformPluginDiscovery() {
- super(Common.libDir());
+ super(Common.connectorDir());
}
@Override
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
index 5670bcc129..632d3af1e4 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
@@ -20,10 +20,12 @@ package org.apache.seatunnel.transform.common;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import lombok.NonNull;
-public abstract class AbstractCatalogSupportTransform extends
AbstractSeaTunnelTransform {
+public abstract class AbstractCatalogSupportTransform implements
SeaTunnelTransform<SeaTunnelRow> {
protected CatalogTable inputCatalogTable;
protected volatile CatalogTable outputCatalogTable;
@@ -32,6 +34,18 @@ public abstract class AbstractCatalogSupportTransform
extends AbstractSeaTunnelT
this.inputCatalogTable = inputCatalogTable;
}
+ @Override
+ public SeaTunnelRow map(SeaTunnelRow row) {
+ return transformRow(row);
+ }
+
+ /**
+ * Outputs transformed row data.
+ *
+ * @param inputRow upstream input row data
+ */
+ protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);
+
@Override
public CatalogTable getProducedCatalogTable() {
if (outputCatalogTable == null) {
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
deleted file mode 100644
index 1892881c27..0000000000
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform.common;
-
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-
-public abstract class AbstractSeaTunnelTransform implements
SeaTunnelTransform<SeaTunnelRow> {
-
- protected String inputTableName;
- protected SeaTunnelRowType inputRowType;
-
- protected SeaTunnelRowType outputRowType;
-
- @Override
- public SeaTunnelRow map(SeaTunnelRow row) {
- return transformRow(row);
- }
-
- /**
- * Outputs transformed row data.
- *
- * @param inputRow upstream input row data
- */
- protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);
-
- @Override
- public CatalogTable getProducedCatalogTable() {
- throw new UnsupportedOperationException(
- String.format(
- "Connector %s must implement
TableTransformFactory.createTransform method",
- getPluginName()));
- }
-}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index a9d04b0739..00316bba8e 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -62,6 +62,8 @@ public class SQLTransform extends
AbstractCatalogSupportTransform {
private transient SQLEngine sqlEngine;
+ private final String inputTableName;
+
public SQLTransform(@NonNull ReadonlyConfig config, @NonNull CatalogTable
catalogTable) {
super(catalogTable);
this.query = config.get(KEY_QUERY);
@@ -77,15 +79,6 @@ public class SQLTransform extends
AbstractCatalogSupportTransform {
} else {
this.inputTableName = catalogTable.getTableId().getTableName();
}
- List<Column> columns = catalogTable.getTableSchema().getColumns();
- String[] fieldNames = new String[columns.size()];
- SeaTunnelDataType<?>[] fieldTypes = new
SeaTunnelDataType<?>[columns.size()];
- for (int i = 0; i < columns.size(); i++) {
- Column column = columns.get(i);
- fieldNames[i] = column.getName();
- fieldTypes[i] = column.getDataType();
- }
- this.inputRowType = new SeaTunnelRowType(fieldNames, fieldTypes);
}
@Override
@@ -98,8 +91,8 @@ public class SQLTransform extends
AbstractCatalogSupportTransform {
sqlEngine = SQLEngineFactory.getSQLEngine(engineType);
sqlEngine.init(
inputTableName,
- inputCatalogTable != null ?
inputCatalogTable.getTableId().getTableName() : null,
- inputRowType,
+ inputCatalogTable.getTableId().getTableName(),
+ inputCatalogTable.getSeaTunnelRowType(),
query);
}