hailin0 commented on code in PR #3409:
URL:
https://github.com/apache/incubator-seatunnel/pull/3409#discussion_r1045237157
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf:
##########
@@ -51,6 +53,7 @@ transform {
sink {
Assert {
+ source_table_name = "fake1"
Review Comment:
remove
##########
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java:
##########
@@ -18,44 +18,56 @@
package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
-import org.apache.seatunnel.spark.BaseSparkTransform;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.spark.SparkEnvironment;
+import
org.apache.seatunnel.translation.spark.common.serialization.InternalRowConverter;
+import org.apache.seatunnel.translation.spark.common.utils.TypeConverterUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.catalyst.expressions.MutableValue;
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.sql.types.StructType;
+import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
-public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<BaseSparkTransform> {
+@Slf4j
+public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunnelTransform> {
private static final String PLUGIN_TYPE = "transform";
- protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment,
- JobContext jobContext,
- List<? extends Config> pluginConfigs) {
+ protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment,
JobContext jobContext, List<? extends Config> pluginConfigs) {
super(sparkEnvironment, jobContext, pluginConfigs);
}
@Override
- protected List<BaseSparkTransform> initializePlugins(List<? extends
Config> pluginConfigs) {
- SeaTunnelSparkTransformPluginDiscovery transformPluginDiscovery = new
SeaTunnelSparkTransformPluginDiscovery();
Review Comment:
remove this file
`org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery`
##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf:
##########
@@ -42,16 +42,16 @@ source {
}
transform {
- sql {
- sql = "select name,age from fake"
- }
-
- # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/transform/sql
+ Filter {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ fields = ["name", "age"]
+ }
Review Comment:
remove
##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf:
##########
@@ -42,16 +42,16 @@ source {
}
transform {
- sql {
- sql = "select name,age from fake"
- }
-
- # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/transform/sql
+ Filter {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ fields = ["name", "age"]
+ }
}
sink {
DataHub {
+ source_table_name = "fake1"
Review Comment:
remove
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf:
##########
@@ -41,8 +41,10 @@ source {
}
transform {
- sql {
- sql = "select name,age from fake"
+ Filter {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ fields = ["name", "age"]
Review Comment:
remove
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java:
##########
@@ -18,14 +18,17 @@
package org.apache.seatunnel.e2e.connector.assertion;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import java.io.IOException;
+@DisabledOnContainer(value = {}, type = {EngineType.SEATUNNEL,
EngineType.FLINK}, disabledReason = "TODO: Transform v2 translation to flink
isn't completed")
Review Comment:
revert
##########
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java:
##########
@@ -70,13 +82,39 @@ public List<Dataset<Row>> execute(List<Dataset<Row>>
upstreamDataStreams) throws
Dataset<Row> input = upstreamDataStreams.get(0);
List<Dataset<Row>> result = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
- BaseSparkTransform transform = plugins.get(i);
- Config pluginConfig = pluginConfigs.get(i);
- Dataset<Row> stream = fromSourceTable(pluginConfig,
sparkEnvironment).orElse(input);
- input = transform.process(stream, sparkEnvironment);
- registerInputTempView(pluginConfig, input);
- result.add(input);
+ try {
+ SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
+ Config pluginConfig = pluginConfigs.get(i);
+ Dataset<Row> stream = fromSourceTable(pluginConfig,
sparkEnvironment).orElse(input);
+ input = sparkTransform(transform, stream);
+ registerInputTempView(pluginConfig, input);
+ result.add(input);
+ } catch (Exception e) {
+ throw new TaskExecuteException(
+ String.format("SeaTunnel transform task: %s execute
error", plugins.get(i).getPluginName()), e);
+ }
}
return result;
}
+
+ private Dataset<Row> sparkTransform(SeaTunnelTransform transform,
Dataset<Row> stream) throws IOException {
+ SeaTunnelDataType<?> seaTunnelDataType =
TypeConverterUtils.convert(stream.schema());
+ transform.setTypeInfo(seaTunnelDataType);
+ StructType structType = (StructType)
TypeConverterUtils.convert(transform.getProducedType());
+ List<Row> outputRows = new ArrayList<>();
+ Iterator<Row> rowIterator = stream.toLocalIterator();
+ SeaTunnelRow seaTunnelRow;
+ while (rowIterator.hasNext()) {
+ Row row = rowIterator.next();
+ seaTunnelRow = new
InternalRowConverter(seaTunnelDataType).reconvert(InternalRow.apply(row.toSeq()));
+ seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
+ InternalRow internalRow = new
InternalRowConverter(transform.getProducedType()).convert(seaTunnelRow);
+ MutableValue[] mutableValues = ((SpecificInternalRow)
internalRow).values();
+ outputRows.add(new GenericRowWithSchema(
+
Arrays.stream(mutableValues).map(MutableValue::boxed).toArray(),
+ structType));
+ }
+ return sparkEnvironment.getSparkSession().createDataFrame(outputRows,
structType);
Review Comment:
maybe should use `Dataset<Row> stream = Dataset#map(...)` or `Dataset<Row>
stream = Dataset#transform(...)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]