This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1db9f45d39 [Bug][Translation][Spark] Fix SeaTunnelRowConvertor fail to 
convert when schema contains row type. (#5170)
1db9f45d39 is described below

commit 1db9f45d39fee3f2a67ffafe719d97ecdfbb7a1f
Author: Chengyu Yan <[email protected]>
AuthorDate: Mon Jul 31 11:22:34 2023 +0800

    [Bug][Translation][Spark] Fix SeaTunnelRowConvertor fail to convert when 
schema contains row type. (#5170)
---
 release-note.md                                    |  2 ++
 .../spark/execution/TransformExecuteProcessor.java |  1 -
 .../src/test/resources/copy_transform.conf         |  6 +++++
 .../resources/filter_row_kind_exclude_delete.conf  |  5 ++++
 .../resources/filter_row_kind_exclude_insert.conf  |  5 ++++
 .../resources/filter_row_kind_include_insert.conf  |  5 ++++
 .../src/test/resources/filter_transform.conf       |  7 +++++-
 .../src/test/resources/split_transform.conf        |  5 ++++
 .../src/test/resources/field_mapper_transform.conf |  6 +++++
 .../src/test/resources/sql_transform.conf          |  7 +++++-
 .../spark/serialization/SeaTunnelRowConverter.java | 29 +++++++++++++++++++---
 11 files changed, 71 insertions(+), 7 deletions(-)

diff --git a/release-note.md b/release-note.md
index 0e84da433c..b542b35a81 100644
--- a/release-note.md
+++ b/release-note.md
@@ -7,6 +7,8 @@
 - [Core] [API] Fix parse nested row data type key changed upper (#4459)
 - [Starter][Flink]Support transform-v2 for flink #3396
 - [Flink] Support flink 1.14.x #3963
+- [Core][Translation][Spark] Fix SeaTunnelRowConvertor fail to convert when 
schema contains row type (#5170)
+
 ### Transformer
 - [Spark] Support transform-v2 for spark (#3409)
 - [ALL]Add FieldMapper Transform #3781
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 179598b3a6..fc9be55925 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -185,7 +185,6 @@ public class TransformExecuteProcessor
                     return null;
                 }
                 seaTunnelRow = outputRowConverter.convert(seaTunnelRow);
-
                 return new GenericRowWithSchema(seaTunnelRow.getFields(), 
structType);
             } catch (Exception e) {
                 throw new TaskExecuteException("Row convert failed, caused: " 
+ e.getMessage(), e);
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
index 25ca4ce5f9..b937b0a8cb 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
@@ -30,6 +30,11 @@ source {
       fields {
         id = "int"
         name = "string"
+        c_row = {
+          c_row = {
+            c_int = int
+          }
+        }
       }
     }
   }
@@ -49,6 +54,7 @@ transform {
       id_1 = "id"
       name2 = "name"
       name3 = "name"
+      c_row_1 = "c_row"
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
index f7fc0f6e0e..8fdf195b03 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
@@ -31,6 +31,11 @@ source {
         id = "int"
         name = "string"
         age = "int"
+        c_row = {
+          c_row = {
+            c_int = int
+          }
+        }
       }
     }
   }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
index cc36417788..9fc0e577cb 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
@@ -31,6 +31,11 @@ source {
         id = "int"
         name = "string"
         age = "int"
+        c_row = {
+          c_row = {
+            c_int = int
+          }
+        }
       }
     }
   }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
index d1fbf79bea..72d1e38cd4 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
@@ -31,6 +31,11 @@ source {
         id = "int"
         name = "string"
         age = "int"
+        c_row = {
+          c_row = {
+            c_int = int
+          }
+        }
       }
     }
   }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
index 56439b4414..c869c70a77 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
@@ -31,6 +31,11 @@ source {
         id = "int"
         name = "string"
         age = "int"
+        c_row = {
+          c_row = {
+            c_int = int
+          }
+        }
       }
     }
   }
@@ -40,7 +45,7 @@ transform {
   Filter {
     source_table_name = "fake"
     result_table_name = "fake1"
-    fields = ["age", "name"]
+    fields = ["age", "name", "c_row"]
   }
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
index 61e10f694a..7ad9fbf8f4 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
@@ -31,6 +31,11 @@ source {
         id = "int"
         name = "string"
         age = "int"
+        c_row = {
+          c_row = {
+            c_int = int
+          }
+        }
       }
     }
   }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf
index c2d1f225f2..59d19f3ee7 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf
@@ -34,6 +34,11 @@ source {
         string1 = "string"
         int1 = "int"
         c_bigint = "bigint"
+        c_row = {
+          c_row = {
+            c_int = int
+          }
+        }
       }
     }
   }
@@ -48,6 +53,7 @@ transform {
       age = age_as
       int1 = int1_as
       name = name
+      c_row = c_row
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
index c5f7c4047e..78e21280f0 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
@@ -36,6 +36,11 @@ source {
         c_map = "map<string, string>"
         c_array = "array<int>"
         c_decimal = "decimal(30, 8)"
+        c_row = {
+          c_row = {
+            c_int = int
+          }
+        }
       }
     }
   }
@@ -46,7 +51,7 @@ transform {
     source_table_name = "fake"
     result_table_name = "fake1"
     # the query table name must same as field 'source_table_name'
-    query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, 
pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal from fake"
+    query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, 
pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake"
   }
   # The SQL transform support base function and criteria operation
   # But the complex SQL unsupported yet, include: multi source table/rows JOIN 
and AGGREGATE operation and the like
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
index 51d5c7308b..15357204cd 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
@@ -24,7 +24,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.translation.serialization.RowConverter;
+import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
 
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import scala.Tuple2;
@@ -51,7 +54,11 @@ public class SeaTunnelRowConverter extends 
RowConverter<SeaTunnelRow> {
     @Override
     public SeaTunnelRow convert(SeaTunnelRow seaTunnelRow) throws IOException {
         validate(seaTunnelRow);
-        return (SeaTunnelRow) convert(seaTunnelRow, dataType);
+        GenericRowWithSchema rowWithSchema = (GenericRowWithSchema) 
convert(seaTunnelRow, dataType);
+        SeaTunnelRow newRow = new SeaTunnelRow(rowWithSchema.values());
+        newRow.setRowKind(seaTunnelRow.getRowKind());
+        newRow.setTableId(seaTunnelRow.getTableId());
+        return newRow;
     }
 
     private Object convert(Object field, SeaTunnelDataType<?> dataType) {
@@ -62,7 +69,7 @@ public class SeaTunnelRowConverter extends 
RowConverter<SeaTunnelRow> {
             case ROW:
                 SeaTunnelRow seaTunnelRow = (SeaTunnelRow) field;
                 SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
-                return convert(seaTunnelRow, rowType);
+                return convertRow(seaTunnelRow, rowType);
             case DATE:
                 return Date.valueOf((LocalDate) field);
             case TIMESTAMP:
@@ -94,16 +101,17 @@ public class SeaTunnelRowConverter extends 
RowConverter<SeaTunnelRow> {
         }
     }
 
-    private SeaTunnelRow convert(SeaTunnelRow seaTunnelRow, SeaTunnelRowType 
rowType) {
+    private GenericRowWithSchema convertRow(SeaTunnelRow seaTunnelRow, 
SeaTunnelRowType rowType) {
         int arity = rowType.getTotalFields();
         Object[] values = new Object[arity];
+        StructType schema = (StructType) TypeConverterUtils.convert(rowType);
         for (int i = 0; i < arity; i++) {
             Object fieldValue = convert(seaTunnelRow.getField(i), 
rowType.getFieldType(i));
             if (fieldValue != null) {
                 values[i] = fieldValue;
             }
         }
-        return new SeaTunnelRow(values);
+        return new GenericRowWithSchema(values, schema);
     }
 
     private scala.collection.immutable.HashMap<Object, Object> convertMap(
@@ -148,6 +156,10 @@ public class SeaTunnelRowConverter extends 
RowConverter<SeaTunnelRow> {
         }
         switch (dataType.getSqlType()) {
             case ROW:
+                if (field instanceof GenericRowWithSchema) {
+                    return createFromGenericRow(
+                            (GenericRowWithSchema) field, (SeaTunnelRowType) 
dataType);
+                }
                 return reconvert((SeaTunnelRow) field, (SeaTunnelRowType) 
dataType);
             case DATE:
                 return ((Date) field).toLocalDate();
@@ -166,6 +178,15 @@ public class SeaTunnelRowConverter extends 
RowConverter<SeaTunnelRow> {
         }
     }
 
+    private SeaTunnelRow createFromGenericRow(GenericRowWithSchema row, 
SeaTunnelRowType type) {
+        Object[] fields = row.values();
+        Object[] newFields = new Object[fields.length];
+        for (int idx = 0; idx < fields.length; idx++) {
+            newFields[idx] = reconvert(fields[idx], type.getFieldType(idx));
+        }
+        return new SeaTunnelRow(newFields);
+    }
+
     private SeaTunnelRow reconvert(SeaTunnelRow engineRow, SeaTunnelRowType 
rowType) {
         int num = engineRow.getFields().length;
         Object[] fields = new Object[num];

Reply via email to