hailin0 commented on code in PR #3409:
URL: 
https://github.com/apache/incubator-seatunnel/pull/3409#discussion_r1045237157


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf:
##########
@@ -51,6 +53,7 @@ transform {
 
 sink {
   Assert {
+    source_table_name = "fake1"

Review Comment:
   remove



##########
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java:
##########
@@ -18,44 +18,56 @@
 package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
-import org.apache.seatunnel.spark.BaseSparkTransform;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
 import org.apache.seatunnel.spark.SparkEnvironment;
+import 
org.apache.seatunnel.translation.spark.common.serialization.InternalRowConverter;
+import org.apache.seatunnel.translation.spark.common.utils.TypeConverterUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.catalyst.expressions.MutableValue;
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.sql.types.StructType;
 
+import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<BaseSparkTransform> {
+@Slf4j
+public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunnelTransform> {
 
     private static final String PLUGIN_TYPE = "transform";
 
-    protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment,
-                                        JobContext jobContext,
-                                        List<? extends Config> pluginConfigs) {
+    protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment, 
JobContext jobContext, List<? extends Config> pluginConfigs) {
         super(sparkEnvironment, jobContext, pluginConfigs);
     }
 
     @Override
-    protected List<BaseSparkTransform> initializePlugins(List<? extends 
Config> pluginConfigs) {
-        SeaTunnelSparkTransformPluginDiscovery transformPluginDiscovery = new 
SeaTunnelSparkTransformPluginDiscovery();

Review Comment:
   remove this file 
`org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery`



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf:
##########
@@ -42,16 +42,16 @@ source {
 }
 
 transform {
-    sql {
-      sql = "select name,age from fake"
-    }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/transform/sql
+  Filter {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    fields = ["name", "age"]
+  }

Review Comment:
   remove



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf:
##########
@@ -42,16 +42,16 @@ source {
 }
 
 transform {
-    sql {
-      sql = "select name,age from fake"
-    }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/transform/sql
+  Filter {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    fields = ["name", "age"]
+  }
 }
 
 sink {
   DataHub {
+    source_table_name = "fake1"

Review Comment:
   remove



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf:
##########
@@ -41,8 +41,10 @@ source {
 }
 
 transform {
-  sql {
-    sql = "select name,age from fake"
+  Filter {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    fields = ["name", "age"]

Review Comment:
   remove 



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java:
##########
@@ -18,14 +18,17 @@
 package org.apache.seatunnel.e2e.connector.assertion;
 
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 
 import java.io.IOException;
 
+@DisabledOnContainer(value = {}, type = {EngineType.SEATUNNEL, 
EngineType.FLINK}, disabledReason = "TODO: Transform v2 translation to flink 
isn't completed")

Review Comment:
   revert



##########
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java:
##########
@@ -70,13 +82,39 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> 
upstreamDataStreams) throws
         Dataset<Row> input = upstreamDataStreams.get(0);
         List<Dataset<Row>> result = new ArrayList<>();
         for (int i = 0; i < plugins.size(); i++) {
-            BaseSparkTransform transform = plugins.get(i);
-            Config pluginConfig = pluginConfigs.get(i);
-            Dataset<Row> stream = fromSourceTable(pluginConfig, 
sparkEnvironment).orElse(input);
-            input = transform.process(stream, sparkEnvironment);
-            registerInputTempView(pluginConfig, input);
-            result.add(input);
+            try {
+                SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
+                Config pluginConfig = pluginConfigs.get(i);
+                Dataset<Row> stream = fromSourceTable(pluginConfig, 
sparkEnvironment).orElse(input);
+                input = sparkTransform(transform, stream);
+                registerInputTempView(pluginConfig, input);
+                result.add(input);
+            } catch (Exception e) {
+                throw new TaskExecuteException(
+                    String.format("SeaTunnel transform task: %s execute 
error", plugins.get(i).getPluginName()), e);
+            }
         }
         return result;
     }
+
+    private Dataset<Row> sparkTransform(SeaTunnelTransform transform, 
Dataset<Row> stream) throws IOException {
+        SeaTunnelDataType<?> seaTunnelDataType = 
TypeConverterUtils.convert(stream.schema());
+        transform.setTypeInfo(seaTunnelDataType);
+        StructType structType = (StructType) 
TypeConverterUtils.convert(transform.getProducedType());
+        List<Row> outputRows = new ArrayList<>();
+        Iterator<Row> rowIterator = stream.toLocalIterator();
+        SeaTunnelRow seaTunnelRow;
+        while (rowIterator.hasNext()) {
+            Row row = rowIterator.next();
+            seaTunnelRow = new 
InternalRowConverter(seaTunnelDataType).reconvert(InternalRow.apply(row.toSeq()));
+            seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
+            InternalRow internalRow = new 
InternalRowConverter(transform.getProducedType()).convert(seaTunnelRow);
+            MutableValue[] mutableValues = ((SpecificInternalRow) 
internalRow).values();
+            outputRows.add(new GenericRowWithSchema(
+                
Arrays.stream(mutableValues).map(MutableValue::boxed).toArray(),
+                structType));
+        }
+        return sparkEnvironment.getSparkSession().createDataFrame(outputRows, 
structType);

Review Comment:
   maybe should use `Dataset<Row> stream = Dataset#map(...)` or `Dataset<Row> 
stream = Dataset#transform(...)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to