This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 294cf6749 [Hotfix][Transform][Spark] Remove unnecessary row conversion
(#4335)
294cf6749 is described below
commit 294cf67491aab7d231620d521911816c304304c5
Author: Laglangyue <[email protected]>
AuthorDate: Tue Mar 14 11:58:32 2023 +0800
[Hotfix][Transform][Spark] Remove unnecessary row conversion (#4335)
* [Bug][Transform][Spark] Remove unnecessary row conversion
* convert
* fix style
* remove not need method,add test
* fix style
* fix
* fix
---------
Co-authored-by: laglangyue <[email protected]>
Co-authored-by: tangjiafu <[email protected]>
---
.../spark/execution/TransformExecuteProcessor.java | 13 ++---------
.../spark/execution/TransformExecuteProcessor.java | 14 +++---------
.../src/test/resources/sql_transform.conf | 25 +++++++++++++++++++++-
.../src/main/resources/examples/spark.batch.conf | 3 ++-
.../spark/serialization/InternalRowConverter.java | 22 -------------------
.../spark/serialization/InternalRowConverter.java | 22 -------------------
6 files changed, 31 insertions(+), 68 deletions(-)
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 0b73719fc..76fa2d0e8 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -26,12 +26,10 @@ import
org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-import
org.apache.seatunnel.translation.spark.serialization.InternalRowConverter;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
@@ -125,21 +123,14 @@ public class TransformExecuteProcessor
SeaTunnelRow seaTunnelRow;
List<Row> outputRows = new ArrayList<>();
Iterator<Row> rowIterator = stream.toLocalIterator();
- InternalRowConverter inputRowConverter = new
InternalRowConverter(seaTunnelDataType);
- InternalRowConverter outputRowConverter =
- new InternalRowConverter(transform.getProducedType());
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
- seaTunnelRow =
inputRowConverter.reconvert(InternalRow.apply(row.toSeq()));
+ seaTunnelRow = new SeaTunnelRow(((GenericRowWithSchema)
row).values());
seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
if (seaTunnelRow == null) {
continue;
}
- InternalRow internalRow = outputRowConverter.convert(seaTunnelRow);
-
- Object[] fields = outputRowConverter.convertDateTime(internalRow,
structType);
-
- outputRows.add(new GenericRowWithSchema(fields, structType));
+ outputRows.add(new GenericRowWithSchema(seaTunnelRow.getFields(),
structType));
}
return
sparkRuntimeEnvironment.getSparkSession().createDataFrame(outputRows,
structType);
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 0b73719fc..908c6cffc 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -26,12 +26,10 @@ import
org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-import
org.apache.seatunnel.translation.spark.serialization.InternalRowConverter;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
@@ -125,22 +123,16 @@ public class TransformExecuteProcessor
SeaTunnelRow seaTunnelRow;
List<Row> outputRows = new ArrayList<>();
Iterator<Row> rowIterator = stream.toLocalIterator();
- InternalRowConverter inputRowConverter = new
InternalRowConverter(seaTunnelDataType);
- InternalRowConverter outputRowConverter =
- new InternalRowConverter(transform.getProducedType());
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
- seaTunnelRow =
inputRowConverter.reconvert(InternalRow.apply(row.toSeq()));
+ seaTunnelRow = new SeaTunnelRow(((GenericRowWithSchema)
row).values());
seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
if (seaTunnelRow == null) {
continue;
}
- InternalRow internalRow = outputRowConverter.convert(seaTunnelRow);
-
- Object[] fields = outputRowConverter.convertDateTime(internalRow,
structType);
-
- outputRows.add(new GenericRowWithSchema(fields, structType));
+ outputRows.add(new GenericRowWithSchema(seaTunnelRow.getFields(),
structType));
}
+
return
sparkRuntimeEnvironment.getSparkSession().createDataFrame(outputRows,
structType);
}
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
index 8273213b3..e5c22dc48 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
@@ -31,6 +31,10 @@ source {
id = "int"
name = "string"
age = "int"
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_timestamp = "timestamp"
+ c_date = "date"
}
}
}
@@ -41,7 +45,8 @@ transform {
source_table_name = "fake"
result_table_name = "fake1"
# the query table name must same as field 'source_table_name'
- query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age,
pi() as pi from fake"
+ query = """select id, regexp_replace(name, '.+', 'b') as name, age+1 as
age, pi() as pi,
+ c_map, c_array, c_timestamp, c_date from fake"""
}
# The SQL transform support base function and criteria operation
# But the complex SQL unsupported yet, include: multi source table/rows JOIN
and AGGREGATE operation and the like
@@ -105,6 +110,24 @@ sink {
rule_type = NOT_NULL
}
]
+ },
+ {
+ field_name = c_timestamp
+ field_type = timestamp
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_date
+ field_type = date
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
}
]
}
diff --git
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 7692598da..f3ad1b5f7 100644
---
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -73,7 +73,8 @@ transform {
# you can also use other transform plugins, such as sql
sql {
- sql = "select
c_map,c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp
from fake"
+ source_table_name = "fake"
+ query = "select
c_map,c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp
from fake"
result_table_name = "sql"
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
index 40aaf1efa..ca7650d62 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
@@ -41,10 +41,7 @@ import
org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
@@ -244,23 +241,4 @@ public final class InternalRowConverter extends
RowConverter<InternalRow> {
}
return newArray;
}
-
- public Object[] convertDateTime(InternalRow internalRow, StructType
structType) {
- Object[] fields =
- Arrays.stream(((SpecificInternalRow) internalRow).values())
- .map(MutableValue::boxed)
- .toArray();
- int len = structType.fields().length;
- for (int i = 0; i < len; i++) {
- DataType dataType = structType.fields()[i].dataType();
- Object field = fields[i];
- if (dataType == DataTypes.TimestampType && field instanceof Long) {
- fields[i] =
Timestamp.from(InstantConverterUtils.ofEpochMicro((long) field));
- }
- if (dataType == DataTypes.DateType && field instanceof Integer) {
- fields[i] = Date.valueOf(LocalDate.ofEpochDay((int) field));
- }
- }
- return fields;
- }
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
index 40aaf1efa..ca7650d62 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
@@ -41,10 +41,7 @@ import
org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
@@ -244,23 +241,4 @@ public final class InternalRowConverter extends
RowConverter<InternalRow> {
}
return newArray;
}
-
- public Object[] convertDateTime(InternalRow internalRow, StructType
structType) {
- Object[] fields =
- Arrays.stream(((SpecificInternalRow) internalRow).values())
- .map(MutableValue::boxed)
- .toArray();
- int len = structType.fields().length;
- for (int i = 0; i < len; i++) {
- DataType dataType = structType.fields()[i].dataType();
- Object field = fields[i];
- if (dataType == DataTypes.TimestampType && field instanceof Long) {
- fields[i] =
Timestamp.from(InstantConverterUtils.ofEpochMicro((long) field));
- }
- if (dataType == DataTypes.DateType && field instanceof Integer) {
- fields[i] = Date.valueOf(LocalDate.ofEpochDay((int) field));
- }
- }
- return fields;
- }
}