This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b2b97ad7a [Feature][Transform-V2][Spark] Support transform-v2 for
spark (#3409)
b2b97ad7a is described below
commit b2b97ad7a765fc140ec240749b597b573c0198d3
Author: FWLamb <[email protected]>
AuthorDate: Tue Jan 3 14:21:40 2023 +0800
[Feature][Transform-V2][Spark] Support transform-v2 for spark (#3409)
* init
* update
* update
* update
* update
* update
* update
* update
* update
* update
* update
* update
* update
* Update TestSuiteBase.java
Co-authored-by: yangbinbin <[email protected]>
Co-authored-by: hailin0 <[email protected]>
---
.../spark/execution/TransformExecuteProcessor.java | 78 +++++++++++++++++-----
.../resources/assertion/fakesource_to_assert.conf | 3 -
.../resources/datahub/fakesource_to_datahub.conf | 6 --
.../jdbc/jdbc_mysql_source_and_sink_parallel.conf | 10 +--
...mysql_source_and_sink_parallel_upper_lower.conf | 10 +--
.../jdbc_postgres_source_and_sink_parallel.conf | 10 +--
...tgres_source_and_sink_parallel_upper_lower.conf | 9 ++-
.../seatunnel/e2e/transform/TestSuiteBase.java | 3 -
.../SeaTunnelSparkTransformPluginDiscovery.java | 32 ---------
9 files changed, 84 insertions(+), 77 deletions(-)
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 66d5e1ee6..8badcacc4 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -18,44 +18,56 @@
package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
-import org.apache.seatunnel.spark.BaseSparkTransform;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.spark.SparkEnvironment;
+import
org.apache.seatunnel.translation.spark.common.serialization.InternalRowConverter;
+import org.apache.seatunnel.translation.spark.common.utils.TypeConverterUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.catalyst.expressions.MutableValue;
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.sql.types.StructType;
+import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
-public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<BaseSparkTransform> {
+@Slf4j
+public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunnelTransform> {
private static final String PLUGIN_TYPE = "transform";
- protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment,
- JobContext jobContext,
- List<? extends Config> pluginConfigs) {
+ protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment,
JobContext jobContext, List<? extends Config> pluginConfigs) {
super(sparkEnvironment, jobContext, pluginConfigs);
}
@Override
- protected List<BaseSparkTransform> initializePlugins(List<? extends
Config> pluginConfigs) {
- SeaTunnelSparkTransformPluginDiscovery transformPluginDiscovery = new
SeaTunnelSparkTransformPluginDiscovery();
+ protected List<SeaTunnelTransform> initializePlugins(List<? extends
Config> pluginConfigs) {
+ SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new
SeaTunnelTransformPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
- List<BaseSparkTransform> transforms = pluginConfigs.stream()
+ List<SeaTunnelTransform> transforms = pluginConfigs.stream()
.map(transformConfig -> {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE,
transformConfig.getString(PLUGIN_NAME));
pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- BaseSparkTransform pluginInstance =
transformPluginDiscovery.createPluginInstance(pluginIdentifier);
- pluginInstance.setConfig(transformConfig);
- pluginInstance.prepare(sparkEnvironment);
+ SeaTunnelTransform pluginInstance =
transformPluginDiscovery.createPluginInstance(pluginIdentifier);
+ pluginInstance.prepare(transformConfig);
+ pluginInstance.setJobContext(jobContext);
return pluginInstance;
}).distinct().collect(Collectors.toList());
sparkEnvironment.registerPlugin(pluginJars);
@@ -70,13 +82,43 @@ public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<Ba
Dataset<Row> input = upstreamDataStreams.get(0);
List<Dataset<Row>> result = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
- BaseSparkTransform transform = plugins.get(i);
- Config pluginConfig = pluginConfigs.get(i);
- Dataset<Row> stream = fromSourceTable(pluginConfig,
sparkEnvironment).orElse(input);
- input = transform.process(stream, sparkEnvironment);
- registerInputTempView(pluginConfig, input);
- result.add(input);
+ try {
+ SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
+ Config pluginConfig = pluginConfigs.get(i);
+ Dataset<Row> stream = fromSourceTable(pluginConfig,
sparkEnvironment).orElse(input);
+ input = sparkTransform(transform, stream);
+ registerInputTempView(pluginConfig, input);
+ result.add(input);
+ } catch (Exception e) {
+ throw new TaskExecuteException(
+ String.format("SeaTunnel transform task: %s execute
error", plugins.get(i).getPluginName()), e);
+ }
}
return result;
}
+
+ private Dataset<Row> sparkTransform(SeaTunnelTransform transform,
Dataset<Row> stream) throws IOException {
+ SeaTunnelDataType<?> seaTunnelDataType =
TypeConverterUtils.convert(stream.schema());
+ transform.setTypeInfo(seaTunnelDataType);
+ StructType structType = (StructType)
TypeConverterUtils.convert(transform.getProducedType());
+ SeaTunnelRow seaTunnelRow;
+ List<Row> outputRows = new ArrayList<>();
+ Iterator<Row> rowIterator = stream.toLocalIterator();
+ InternalRowConverter inputRowConverter = new
InternalRowConverter(seaTunnelDataType);
+ InternalRowConverter outputRowConverter = new
InternalRowConverter(transform.getProducedType());
+ while (rowIterator.hasNext()) {
+ Row row = rowIterator.next();
+ seaTunnelRow =
inputRowConverter.reconvert(InternalRow.apply(row.toSeq()));
+ seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
+ if (seaTunnelRow == null) {
+ continue;
+ }
+ InternalRow internalRow = outputRowConverter.convert(seaTunnelRow);
+ outputRows.add(new GenericRowWithSchema(
+ Arrays.stream(((SpecificInternalRow)
internalRow).values()).map(MutableValue::boxed).toArray(),
+ structType));
+ }
+ return sparkEnvironment.getSparkSession().createDataFrame(outputRows,
structType);
+ }
+
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index 4e512683f..615bec8cf 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -46,9 +46,6 @@ transform {
result_table_name = "fake1"
fields = ["name", "age"]
}
-
- # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}
sink {
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
index edc66d6a2..888516aff 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
@@ -42,12 +42,6 @@ source {
}
transform {
- sql {
- sql = "select name,age from fake"
- }
-
- # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
index 616810b2e..b19322cd9 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
@@ -35,14 +35,16 @@ source{
}
transform {
- sql {
- sql = "select name,age from jdbc"
- }
+ Filter {
+ source_table_name = "jdbc"
+ result_table_name = "jdbc1"
+ fields = ["name", "age"]
+ }
}
sink {
jdbc {
-
+ source_table_name = "jdbc1"
url = "jdbc:mysql://mysql:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
index e589609c5..1f28801f5 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -37,14 +37,16 @@ source{
}
transform {
- sql {
- sql = "select name,age from jdbc"
- }
+ Filter {
+ source_table_name = "jdbc"
+ result_table_name = "jdbc1"
+ fields = ["name", "age"]
+ }
}
sink {
jdbc {
-
+ source_table_name = "jdbc1"
url = "jdbc:mysql://mysql:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
index 0b13a1530..22bb9d17f 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
@@ -35,14 +35,16 @@ source{
}
transform {
- sql {
- sql = "select name,age from jdbc"
- }
+ Filter {
+ source_table_name = "jdbc"
+ result_table_name = "jdbc1"
+ fields = ["name", "age"]
+ }
}
sink {
jdbc {
-
+ source_table_name = "jdbc1"
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
index 33e1d8a03..ec3bf4fdc 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
@@ -37,13 +37,16 @@ source{
}
transform {
- sql {
- sql = "select name,age from jdbc"
- }
+ Filter {
+ source_table_name = "jdbc"
+ result_table_name = "jdbc1"
+ fields = ["name", "age"]
+ }
}
sink {
jdbc {
+ source_table_name = "jdbc1"
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java
index 2672fd466..41be966c1 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java
@@ -17,11 +17,9 @@
package org.apache.seatunnel.e2e.transform;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainersFactory;
import org.apache.seatunnel.e2e.common.junit.ContainerTestingExtension;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestCaseInvocationContextProvider;
import org.apache.seatunnel.e2e.common.junit.TestContainers;
import org.apache.seatunnel.e2e.common.junit.TestLoggerExtension;
@@ -37,7 +35,6 @@ import org.testcontainers.containers.Network;
TestCaseInvocationContextProvider.class
})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-@DisabledOnContainer(value = {}, type = {EngineType.SPARK}, disabledReason =
"TODO: Transform v2 translation to spark isn't completed")
public abstract class TestSuiteBase {
protected static final Network NETWORK = TestContainer.NETWORK;
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
deleted file mode 100644
index bfc5358e7..000000000
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.plugin.discovery.seatunnel;
-
-import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.spark.BaseSparkTransform;
-
-public class SeaTunnelSparkTransformPluginDiscovery extends
AbstractPluginDiscovery<BaseSparkTransform> {
- public SeaTunnelSparkTransformPluginDiscovery() {
- super("seatunnel");
- }
-
- @Override
- protected Class<BaseSparkTransform> getPluginBaseClass() {
- return BaseSparkTransform.class;
- }
-}