This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1db9f45d39 [Bug][Translation][Spark] Fix SeaTunnelRowConvertor fail to
convert when schema contains row type. (#5170)
1db9f45d39 is described below
commit 1db9f45d39fee3f2a67ffafe719d97ecdfbb7a1f
Author: Chengyu Yan <[email protected]>
AuthorDate: Mon Jul 31 11:22:34 2023 +0800
[Bug][Translation][Spark] Fix SeaTunnelRowConvertor fail to convert when
schema contains row type. (#5170)
---
release-note.md | 2 ++
.../spark/execution/TransformExecuteProcessor.java | 1 -
.../src/test/resources/copy_transform.conf | 6 +++++
.../resources/filter_row_kind_exclude_delete.conf | 5 ++++
.../resources/filter_row_kind_exclude_insert.conf | 5 ++++
.../resources/filter_row_kind_include_insert.conf | 5 ++++
.../src/test/resources/filter_transform.conf | 7 +++++-
.../src/test/resources/split_transform.conf | 5 ++++
.../src/test/resources/field_mapper_transform.conf | 6 +++++
.../src/test/resources/sql_transform.conf | 7 +++++-
.../spark/serialization/SeaTunnelRowConverter.java | 29 +++++++++++++++++++---
11 files changed, 71 insertions(+), 7 deletions(-)
diff --git a/release-note.md b/release-note.md
index 0e84da433c..b542b35a81 100644
--- a/release-note.md
+++ b/release-note.md
@@ -7,6 +7,8 @@
- [Core] [API] Fix parse nested row data type key changed upper (#4459)
- [Starter][Flink]Support transform-v2 for flink #3396
- [Flink] Support flink 1.14.x #3963
+- [Core][Translation][Spark] Fix SeaTunnelRowConvertor fail to convert when
schema contains row type (#5170)
+
### Transformer
- [Spark] Support transform-v2 for spark (#3409)
- [ALL]Add FieldMapper Transform #3781
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 179598b3a6..fc9be55925 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -185,7 +185,6 @@ public class TransformExecuteProcessor
return null;
}
seaTunnelRow = outputRowConverter.convert(seaTunnelRow);
-
return new GenericRowWithSchema(seaTunnelRow.getFields(),
structType);
} catch (Exception e) {
throw new TaskExecuteException("Row convert failed, caused: "
+ e.getMessage(), e);
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
index 25ca4ce5f9..b937b0a8cb 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform.conf
@@ -30,6 +30,11 @@ source {
fields {
id = "int"
name = "string"
+ c_row = {
+ c_row = {
+ c_int = int
+ }
+ }
}
}
}
@@ -49,6 +54,7 @@ transform {
id_1 = "id"
name2 = "name"
name3 = "name"
+ c_row_1 = "c_row"
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
index f7fc0f6e0e..8fdf195b03 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_delete.conf
@@ -31,6 +31,11 @@ source {
id = "int"
name = "string"
age = "int"
+ c_row = {
+ c_row = {
+ c_int = int
+ }
+ }
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
index cc36417788..9fc0e577cb 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_exclude_insert.conf
@@ -31,6 +31,11 @@ source {
id = "int"
name = "string"
age = "int"
+ c_row = {
+ c_row = {
+ c_int = int
+ }
+ }
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
index d1fbf79bea..72d1e38cd4 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_kind_include_insert.conf
@@ -31,6 +31,11 @@ source {
id = "int"
name = "string"
age = "int"
+ c_row = {
+ c_row = {
+ c_int = int
+ }
+ }
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
index 56439b4414..c869c70a77 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_transform.conf
@@ -31,6 +31,11 @@ source {
id = "int"
name = "string"
age = "int"
+ c_row = {
+ c_row = {
+ c_int = int
+ }
+ }
}
}
}
@@ -40,7 +45,7 @@ transform {
Filter {
source_table_name = "fake"
result_table_name = "fake1"
- fields = ["age", "name"]
+ fields = ["age", "name", "c_row"]
}
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
index 61e10f694a..7ad9fbf8f4 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/split_transform.conf
@@ -31,6 +31,11 @@ source {
id = "int"
name = "string"
age = "int"
+ c_row = {
+ c_row = {
+ c_int = int
+ }
+ }
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf
index c2d1f225f2..59d19f3ee7 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform.conf
@@ -34,6 +34,11 @@ source {
string1 = "string"
int1 = "int"
c_bigint = "bigint"
+ c_row = {
+ c_row = {
+ c_int = int
+ }
+ }
}
}
}
@@ -48,6 +53,7 @@ transform {
age = age_as
int1 = int1_as
name = name
+ c_row = c_row
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
index c5f7c4047e..78e21280f0 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
@@ -36,6 +36,11 @@ source {
c_map = "map<string, string>"
c_array = "array<int>"
c_decimal = "decimal(30, 8)"
+ c_row = {
+ c_row = {
+ c_int = int
+ }
+ }
}
}
}
@@ -46,7 +51,7 @@ transform {
source_table_name = "fake"
result_table_name = "fake1"
# the query table name must same as field 'source_table_name'
- query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age,
pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal from fake"
+ query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age,
pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake"
}
# The SQL transform support base function and criteria operation
# But the complex SQL unsupported yet, include: multi source table/rows JOIN
and AGGREGATE operation and the like
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
index 51d5c7308b..15357204cd 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
@@ -24,7 +24,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.translation.serialization.RowConverter;
+import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import scala.Tuple2;
@@ -51,7 +54,11 @@ public class SeaTunnelRowConverter extends
RowConverter<SeaTunnelRow> {
@Override
public SeaTunnelRow convert(SeaTunnelRow seaTunnelRow) throws IOException {
validate(seaTunnelRow);
- return (SeaTunnelRow) convert(seaTunnelRow, dataType);
+ GenericRowWithSchema rowWithSchema = (GenericRowWithSchema)
convert(seaTunnelRow, dataType);
+ SeaTunnelRow newRow = new SeaTunnelRow(rowWithSchema.values());
+ newRow.setRowKind(seaTunnelRow.getRowKind());
+ newRow.setTableId(seaTunnelRow.getTableId());
+ return newRow;
}
private Object convert(Object field, SeaTunnelDataType<?> dataType) {
@@ -62,7 +69,7 @@ public class SeaTunnelRowConverter extends
RowConverter<SeaTunnelRow> {
case ROW:
SeaTunnelRow seaTunnelRow = (SeaTunnelRow) field;
SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
- return convert(seaTunnelRow, rowType);
+ return convertRow(seaTunnelRow, rowType);
case DATE:
return Date.valueOf((LocalDate) field);
case TIMESTAMP:
@@ -94,16 +101,17 @@ public class SeaTunnelRowConverter extends
RowConverter<SeaTunnelRow> {
}
}
- private SeaTunnelRow convert(SeaTunnelRow seaTunnelRow, SeaTunnelRowType
rowType) {
+ private GenericRowWithSchema convertRow(SeaTunnelRow seaTunnelRow,
SeaTunnelRowType rowType) {
int arity = rowType.getTotalFields();
Object[] values = new Object[arity];
+ StructType schema = (StructType) TypeConverterUtils.convert(rowType);
for (int i = 0; i < arity; i++) {
Object fieldValue = convert(seaTunnelRow.getField(i),
rowType.getFieldType(i));
if (fieldValue != null) {
values[i] = fieldValue;
}
}
- return new SeaTunnelRow(values);
+ return new GenericRowWithSchema(values, schema);
}
private scala.collection.immutable.HashMap<Object, Object> convertMap(
@@ -148,6 +156,10 @@ public class SeaTunnelRowConverter extends
RowConverter<SeaTunnelRow> {
}
switch (dataType.getSqlType()) {
case ROW:
+ if (field instanceof GenericRowWithSchema) {
+ return createFromGenericRow(
+ (GenericRowWithSchema) field, (SeaTunnelRowType)
dataType);
+ }
return reconvert((SeaTunnelRow) field, (SeaTunnelRowType)
dataType);
case DATE:
return ((Date) field).toLocalDate();
@@ -166,6 +178,15 @@ public class SeaTunnelRowConverter extends
RowConverter<SeaTunnelRow> {
}
}
+ private SeaTunnelRow createFromGenericRow(GenericRowWithSchema row,
SeaTunnelRowType type) {
+ Object[] fields = row.values();
+ Object[] newFields = new Object[fields.length];
+ for (int idx = 0; idx < fields.length; idx++) {
+ newFields[idx] = reconvert(fields[idx], type.getFieldType(idx));
+ }
+ return new SeaTunnelRow(newFields);
+ }
+
private SeaTunnelRow reconvert(SeaTunnelRow engineRow, SeaTunnelRowType
rowType) {
int num = engineRow.getFields().length;
Object[] fields = new Object[num];