This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 294cf6749 [Hotfix][Transform][Spark] Remove unnecessary row conversion 
(#4335)
294cf6749 is described below

commit 294cf67491aab7d231620d521911816c304304c5
Author: Laglangyue <[email protected]>
AuthorDate: Tue Mar 14 11:58:32 2023 +0800

    [Hotfix][Transform][Spark] Remove unnecessary row conversion (#4335)
    
    * [Bug][Transform][Spark] Remove unnecessary row conversion
    
    * convert
    
    * fix style
    
    * remove not need method,add test
    
    * fix style
    
    * fix
    
    * fix
    
    ---------
    
    Co-authored-by: laglangyue <[email protected]>
    Co-authored-by: tangjiafu <[email protected]>
---
 .../spark/execution/TransformExecuteProcessor.java | 13 ++---------
 .../spark/execution/TransformExecuteProcessor.java | 14 +++---------
 .../src/test/resources/sql_transform.conf          | 25 +++++++++++++++++++++-
 .../src/main/resources/examples/spark.batch.conf   |  3 ++-
 .../spark/serialization/InternalRowConverter.java  | 22 -------------------
 .../spark/serialization/InternalRowConverter.java  | 22 -------------------
 6 files changed, 31 insertions(+), 68 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 0b73719fc..76fa2d0e8 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -26,12 +26,10 @@ import 
org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-import 
org.apache.seatunnel.translation.spark.serialization.InternalRowConverter;
 import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
 
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
 import org.apache.spark.sql.types.StructType;
 
@@ -125,21 +123,14 @@ public class TransformExecuteProcessor
         SeaTunnelRow seaTunnelRow;
         List<Row> outputRows = new ArrayList<>();
         Iterator<Row> rowIterator = stream.toLocalIterator();
-        InternalRowConverter inputRowConverter = new 
InternalRowConverter(seaTunnelDataType);
-        InternalRowConverter outputRowConverter =
-                new InternalRowConverter(transform.getProducedType());
         while (rowIterator.hasNext()) {
             Row row = rowIterator.next();
-            seaTunnelRow = 
inputRowConverter.reconvert(InternalRow.apply(row.toSeq()));
+            seaTunnelRow = new SeaTunnelRow(((GenericRowWithSchema) 
row).values());
             seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
             if (seaTunnelRow == null) {
                 continue;
             }
-            InternalRow internalRow = outputRowConverter.convert(seaTunnelRow);
-
-            Object[] fields = outputRowConverter.convertDateTime(internalRow, 
structType);
-
-            outputRows.add(new GenericRowWithSchema(fields, structType));
+            outputRows.add(new GenericRowWithSchema(seaTunnelRow.getFields(), 
structType));
         }
         return 
sparkRuntimeEnvironment.getSparkSession().createDataFrame(outputRows, 
structType);
     }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 0b73719fc..908c6cffc 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -26,12 +26,10 @@ import 
org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-import 
org.apache.seatunnel.translation.spark.serialization.InternalRowConverter;
 import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
 
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
 import org.apache.spark.sql.types.StructType;
 
@@ -125,22 +123,16 @@ public class TransformExecuteProcessor
         SeaTunnelRow seaTunnelRow;
         List<Row> outputRows = new ArrayList<>();
         Iterator<Row> rowIterator = stream.toLocalIterator();
-        InternalRowConverter inputRowConverter = new 
InternalRowConverter(seaTunnelDataType);
-        InternalRowConverter outputRowConverter =
-                new InternalRowConverter(transform.getProducedType());
         while (rowIterator.hasNext()) {
             Row row = rowIterator.next();
-            seaTunnelRow = 
inputRowConverter.reconvert(InternalRow.apply(row.toSeq()));
+            seaTunnelRow = new SeaTunnelRow(((GenericRowWithSchema) 
row).values());
             seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
             if (seaTunnelRow == null) {
                 continue;
             }
-            InternalRow internalRow = outputRowConverter.convert(seaTunnelRow);
-
-            Object[] fields = outputRowConverter.convertDateTime(internalRow, 
structType);
-
-            outputRows.add(new GenericRowWithSchema(fields, structType));
+            outputRows.add(new GenericRowWithSchema(seaTunnelRow.getFields(), 
structType));
         }
+
         return 
sparkRuntimeEnvironment.getSparkSession().createDataFrame(outputRows, 
structType);
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
index 8273213b3..e5c22dc48 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
@@ -31,6 +31,10 @@ source {
         id = "int"
         name = "string"
         age = "int"
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_timestamp = "timestamp"
+        c_date = "date"
       }
     }
   }
@@ -41,7 +45,8 @@ transform {
     source_table_name = "fake"
     result_table_name = "fake1"
     # the query table name must same as field 'source_table_name'
-    query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, 
pi() as pi from fake"
+    query = """select id, regexp_replace(name, '.+', 'b') as name, age+1 as 
age, pi() as pi,
+    c_map, c_array, c_timestamp, c_date from fake"""
   }
   # The SQL transform support base function and criteria operation
   # But the complex SQL unsupported yet, include: multi source table/rows JOIN 
and AGGREGATE operation and the like
@@ -105,6 +110,24 @@ sink {
                 rule_type = NOT_NULL
               }
             ]
+          },
+          {
+            field_name = c_timestamp
+            field_type = timestamp
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_date
+            field_type = date
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
           }
         ]
       }
diff --git 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 7692598da..f3ad1b5f7 100644
--- 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -73,7 +73,8 @@ transform {
 
   # you can also use other transform plugins, such as sql
   sql {
-    sql = "select 
c_map,c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp
 from fake"
+    source_table_name = "fake"
+    query = "select 
c_map,c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp
 from fake"
     result_table_name = "sql"
   }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
index 40aaf1efa..ca7650d62 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
@@ -41,10 +41,7 @@ import 
org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
@@ -244,23 +241,4 @@ public final class InternalRowConverter extends 
RowConverter<InternalRow> {
         }
         return newArray;
     }
-
-    public Object[] convertDateTime(InternalRow internalRow, StructType 
structType) {
-        Object[] fields =
-                Arrays.stream(((SpecificInternalRow) internalRow).values())
-                        .map(MutableValue::boxed)
-                        .toArray();
-        int len = structType.fields().length;
-        for (int i = 0; i < len; i++) {
-            DataType dataType = structType.fields()[i].dataType();
-            Object field = fields[i];
-            if (dataType == DataTypes.TimestampType && field instanceof Long) {
-                fields[i] = 
Timestamp.from(InstantConverterUtils.ofEpochMicro((long) field));
-            }
-            if (dataType == DataTypes.DateType && field instanceof Integer) {
-                fields[i] = Date.valueOf(LocalDate.ofEpochDay((int) field));
-            }
-        }
-        return fields;
-    }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
index 40aaf1efa..ca7650d62 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
@@ -41,10 +41,7 @@ import 
org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
@@ -244,23 +241,4 @@ public final class InternalRowConverter extends 
RowConverter<InternalRow> {
         }
         return newArray;
     }
-
-    public Object[] convertDateTime(InternalRow internalRow, StructType 
structType) {
-        Object[] fields =
-                Arrays.stream(((SpecificInternalRow) internalRow).values())
-                        .map(MutableValue::boxed)
-                        .toArray();
-        int len = structType.fields().length;
-        for (int i = 0; i < len; i++) {
-            DataType dataType = structType.fields()[i].dataType();
-            Object field = fields[i];
-            if (dataType == DataTypes.TimestampType && field instanceof Long) {
-                fields[i] = 
Timestamp.from(InstantConverterUtils.ofEpochMicro((long) field));
-            }
-            if (dataType == DataTypes.DateType && field instanceof Integer) {
-                fields[i] = Date.valueOf(LocalDate.ofEpochDay((int) field));
-            }
-        }
-        return fields;
-    }
 }

Reply via email to