This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 132278c06a [Feature][transform] transform support explode (#7928)
132278c06a is described below
commit 132278c06a6f2af12934b948f65f8baf4b69db79
Author: CosmosNi <[email protected]>
AuthorDate: Sat Nov 9 16:55:35 2024 +0800
[Feature][transform] transform support explode (#7928)
Co-authored-by: njh_cmss <[email protected]>
---
docs/en/transform-v2/sql-functions.md | 31 +++
docs/zh/transform-v2/sql-functions.md | 32 +++
pom.xml | 2 +-
.../api/transform/SeaTunnelMultiRowTransform.java | 28 ++-
.../seatunnel/config/sql/SqlConfigBuilder.java | 5 +-
.../seatunnel/jdbc/source/FixedChunkSplitter.java | 4 +-
.../converter/SqlToPaimonPredicateConverter.java | 15 +-
.../flink/execution/TransformExecuteProcessor.java | 29 +++
.../spark/execution/TransformExecuteProcessor.java | 61 +++---
.../apache/seatunnel/e2e/transform/TestSQLIT.java | 12 ++
.../resources/sql_transform/explode_transform.conf | 103 ++++++++++
.../explode_transform_with_outer.conf | 99 +++++++++
.../explode_transform_without_outer.conf | 95 +++++++++
.../test/resources/sql_transform/func_system.conf | 6 +-
.../server/task/flow/TransformFlowLifeCycle.java | 66 ++++--
.../AbstractCatalogMultiRowTransform.java} | 33 +--
.../common/AbstractCatalogSupportTransform.java | 70 +------
...nsform.java => AbstractSeaTunnelTransform.java} | 60 +++---
.../apache/seatunnel/transform/sql/SQLEngine.java | 2 +-
.../seatunnel/transform/sql/SQLTransform.java | 12 +-
.../transform/sql/zeta/ZetaSQLEngine.java | 57 +++---
.../transform/sql/zeta/ZetaSQLFilter.java | 5 +-
.../transform/sql/zeta/ZetaSQLFunction.java | 226 ++++++++++++++++++++-
.../seatunnel/transform/sql/zeta/ZetaSQLType.java | 11 +-
.../sql/zeta/functions/StringFunction.java | 14 ++
.../sql/zeta/functions/SystemFunction.java | 12 ++
.../seatunnel/transform/sql/SQLTransformTest.java | 13 +-
.../transform/sql/zeta/DateTimeFunctionTest.java | 4 +-
tools/dependencies/known-dependencies.txt | 2 +-
29 files changed, 872 insertions(+), 237 deletions(-)
diff --git a/docs/en/transform-v2/sql-functions.md
b/docs/en/transform-v2/sql-functions.md
index ce01df937f..31a3398937 100644
--- a/docs/en/transform-v2/sql-functions.md
+++ b/docs/en/transform-v2/sql-functions.md
@@ -302,6 +302,14 @@ Example:
REPLACE(NAME, ' ')
+### SPLIT
+
+Split a string into an array.
+
+Example:
+
+select SPLIT(test,';') as arrays
+
### SOUNDEX
```SOUNDEX(string)```
@@ -984,3 +992,26 @@ Example:
select UUID() as seatunnel_uuid
+### ARRAY
+
+Generate an array.
+
+Example:
+
+select ARRAY('test1','test2','test3') as arrays
+
+
+### LATERAL VIEW
+#### EXPLODE
+
+explode array column to rows.
+OUTER EXPLODE will return NULL, while array is NULL or empty
+EXPLODE(SPLIT(FIELD_NAME,separator))Used to split string type. The first
parameter of SPLIT function is the field name, the second parameter is the
separator
+EXPLODE(ARRAY(value1,value2)) Used to custom array type.
+```
+SELECT * FROM fake
+ LATERAL VIEW EXPLODE ( SPLIT ( NAME, ',' ) ) AS NAME
+ LATERAL VIEW EXPLODE ( SPLIT ( pk_id, ';' ) ) AS pk_id
+ LATERAL VIEW OUTER EXPLODE ( age ) AS age
+ LATERAL VIEW OUTER EXPLODE ( ARRAY(1,1) ) AS num
+```
diff --git a/docs/zh/transform-v2/sql-functions.md
b/docs/zh/transform-v2/sql-functions.md
index 13dc3a9bc5..7e3f8454e1 100644
--- a/docs/zh/transform-v2/sql-functions.md
+++ b/docs/zh/transform-v2/sql-functions.md
@@ -302,6 +302,15 @@ REPEAT(NAME || ' ', 10)
REPLACE(NAME, ' ')
+
+### SPLIT
+
+将字符串切分成数组。
+
+示例:
+
+select SPLIT(test,';') as arrays
+
### SOUNDEX
```SOUNDEX(string)```
@@ -975,3 +984,26 @@ case when c_string in ('c_string') then 1 else 0 end
select UUID() as seatunnel_uuid
+
+### ARRAY
+
+生成一个数组。
+
+示例:
+
+select ARRAY('test1','test2','test3') as arrays
+
+### LATERAL VIEW
+#### EXPLODE
+
+将 array 列展开成多行。
+OUTER EXPLODE 当 array 为NULL或者为空时,返回NULL
+EXPLODE(SPLIT(FIELD_NAME,separator))用来切分字符串类型,SPLIT 第一个参数是字段名,第二个参数是分隔符
+EXPLODE(ARRAY(value1,value2)) 用于自定义数组切分,在原有基础上生成一个新的字段。
+```
+SELECT * FROM fake
+ LATERAL VIEW EXPLODE ( SPLIT ( NAME, ',' ) ) AS NAME
+ LATERAL VIEW EXPLODE ( SPLIT ( pk_id, ';' ) ) AS pk_id
+ LATERAL VIEW OUTER EXPLODE ( age ) AS age
+ LATERAL VIEW OUTER EXPLODE ( ARRAY(1,1) ) AS num
+```
diff --git a/pom.xml b/pom.xml
index 872e94112b..fdb967a1d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,7 +135,7 @@
<maven-scm-provider-jgit.version>2.0.0</maven-scm-provider-jgit.version>
<testcontainer.version>1.17.6</testcontainer.version>
<spotless.version>2.29.0</spotless.version>
- <jsqlparser.version>4.5</jsqlparser.version>
+ <jsqlparser.version>4.9</jsqlparser.version>
<json-path.version>2.7.0</json-path.version>
<groovy.version>4.0.16</groovy.version>
<jetty.version>9.4.56.v20240826</jetty.version>
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
similarity index 62%
copy from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
index 6dfaddca00..1f78e8be48 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
@@ -14,24 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.seatunnel.transform.sql;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+package org.apache.seatunnel.api.transform;
import java.util.List;
-public interface SQLEngine {
- void init(
- String inputTableName,
- String catalogTableName,
- SeaTunnelRowType inputRowType,
- String sql);
-
- SeaTunnelRowType typeMapping(List<String> inputColumnsMapping);
+public interface SeaTunnelMultiRowTransform<T> extends SeaTunnelTransform<T> {
- SeaTunnelRow transformBySQL(SeaTunnelRow inputRow);
+ /**
+ * Transform input data to {@link
this#getProducedCatalogTable().getSeaTunnelRowType()} types
+ * data.
+ *
+ * @param row the data need be transformed.
+ * @return transformed data.
+ */
+ List<T> flatMap(T row);
- default void close() {}
+ default T map(T row) {
+ throw new UnsupportedOperationException("Heads-up conversion is not
supported");
+ }
}
diff --git
a/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
b/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
index f0d68e089b..00f6a40a43 100644
---
a/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
+++
b/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
@@ -43,7 +43,6 @@ import net.sf.jsqlparser.statement.create.table.CreateTable;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
-import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
import java.nio.file.Files;
@@ -360,12 +359,12 @@ public class SqlConfigBuilder {
String sourceTableName;
String resultTableName;
if (plainSelect.getFromItem() == null) {
- List<SelectItem> selectItems = plainSelect.getSelectItems();
+ List<SelectItem<?>> selectItems = plainSelect.getSelectItems();
if (selectItems.size() != 1) {
throw new ParserException(
"Source table must be specified in SQL: " +
insertSql);
}
- SelectExpressionItem selectItem = (SelectExpressionItem)
selectItems.get(0);
+ SelectItem<?> selectItem = selectItems.get(0);
Column column = (Column) selectItem.getExpression();
sourceTableName = column.getColumnName();
resultTableName = sourceTableName;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
index edeef96f0a..72a4e061ac 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
@@ -82,8 +82,8 @@ public class FixedChunkSplitter extends ChunkSplitter {
partitionEnd = range.getRight();
}
if (partitionStart == null || partitionEnd == null) {
- JdbcSourceSplit spilt = createSingleSplit(table);
- return Collections.singletonList(spilt);
+ JdbcSourceSplit split = createSingleSplit(table);
+ return Collections.singletonList(split);
}
return createNumberColumnSplits(
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
index 212bfd6e8b..0bf47b1310 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
@@ -54,8 +54,6 @@ import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.select.AllColumns;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
-import net.sf.jsqlparser.statement.select.SelectBody;
-import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
import java.math.BigDecimal;
@@ -83,7 +81,7 @@ public class SqlToPaimonPredicateConverter {
throw new IllegalArgumentException("Only SELECT statements are
supported.");
}
Select select = (Select) statement;
- SelectBody selectBody = select.getSelectBody();
+ Select selectBody = select.getSelectBody();
if (!(selectBody instanceof PlainSelect)) {
throw new IllegalArgumentException("Only simple SELECT statements
are supported.");
}
@@ -101,18 +99,15 @@ public class SqlToPaimonPredicateConverter {
public static int[] convertSqlSelectToPaimonProjectionIndex(
String[] fieldNames, PlainSelect plainSelect) {
int[] projectionIndex = null;
- List<SelectItem> selectItems = plainSelect.getSelectItems();
+ List<SelectItem<?>> selectItems = plainSelect.getSelectItems();
List<String> columnNames = new ArrayList<>();
for (SelectItem selectItem : selectItems) {
- if (selectItem instanceof AllColumns) {
+ if (selectItem.getExpression() instanceof AllColumns) {
return null;
- } else if (selectItem instanceof SelectExpressionItem) {
- SelectExpressionItem selectExpressionItem =
(SelectExpressionItem) selectItem;
- String columnName =
selectExpressionItem.getExpression().toString();
- columnNames.add(columnName);
} else {
- throw new IllegalArgumentException("Error encountered parsing
query fields.");
+ String columnName = ((Column)
selectItem.getExpression()).getColumnName();
+ columnNames.add(columnName);
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 00a5046e43..982179dd2e 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -25,15 +25,19 @@ import
org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.util.Collector;
import java.net.URL;
import java.util.ArrayList;
@@ -140,6 +144,11 @@ public class TransformExecuteProcessor
protected DataStream<SeaTunnelRow> flinkTransform(
SeaTunnelTransform transform, DataStream<SeaTunnelRow> stream) {
+ if (transform instanceof SeaTunnelMultiRowTransform) {
+ return stream.flatMap(
+ new ArrayFlatMap(transform),
TypeInformation.of(SeaTunnelRow.class));
+ }
+
return stream.transform(
String.format("%s-Transform", transform.getPluginName()),
TypeInformation.of(SeaTunnelRow.class),
@@ -151,4 +160,24 @@ public class TransformExecuteProcessor
((SeaTunnelTransform<SeaTunnelRow>) transform)
.map(row))));
}
+
+ public static class ArrayFlatMap implements FlatMapFunction<SeaTunnelRow,
SeaTunnelRow> {
+
+ private SeaTunnelTransform transform;
+
+ public ArrayFlatMap(SeaTunnelTransform transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public void flatMap(SeaTunnelRow row, Collector<SeaTunnelRow>
collector) {
+ List<SeaTunnelRow> rows =
+ ((SeaTunnelMultiRowTransform<SeaTunnelRow>)
transform).flatMap(row);
+ if (CollectionUtils.isNotEmpty(rows)) {
+ for (SeaTunnelRow rowResult : rows) {
+ collector.collect(rowResult);
+ }
+ }
+ }
+ }
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 3736576817..44a60195ee 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -35,7 +36,8 @@ import
org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
import
org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
-import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
@@ -45,7 +47,6 @@ import org.apache.spark.sql.types.StructType;
import lombok.extern.slf4j.Slf4j;
-import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
@@ -164,57 +165,49 @@ public class TransformExecuteProcessor
SeaTunnelRowConverter inputRowConverter = new
SeaTunnelRowConverter(inputDataType);
SeaTunnelRowConverter outputRowConverter = new
SeaTunnelRowConverter(outputDataTYpe);
ExpressionEncoder<Row> encoder = RowEncoder.apply(outputSchema);
- return stream.mapPartitions(
- (MapPartitionsFunction<Row, Row>)
- (Iterator<Row> rowIterator) ->
- new TransformIterator(
- rowIterator,
- transform,
- outputSchema,
- inputRowConverter,
- outputRowConverter),
+
+ return stream.flatMap(
+ new TransformMapPartitionsFunction(
+ transform, inputRowConverter,
outputRowConverter),
encoder)
.filter(Objects::nonNull);
}
- private static class TransformIterator implements Iterator<Row>,
Serializable {
- private Iterator<Row> sourceIterator;
+ private static class TransformMapPartitionsFunction implements
FlatMapFunction<Row, Row> {
private SeaTunnelTransform<SeaTunnelRow> transform;
- private StructType structType;
private SeaTunnelRowConverter inputRowConverter;
private SeaTunnelRowConverter outputRowConverter;
- public TransformIterator(
- Iterator<Row> sourceIterator,
+ public TransformMapPartitionsFunction(
SeaTunnelTransform<SeaTunnelRow> transform,
- StructType structType,
SeaTunnelRowConverter inputRowConverter,
SeaTunnelRowConverter outputRowConverter) {
- this.sourceIterator = sourceIterator;
this.transform = transform;
- this.structType = structType;
this.inputRowConverter = inputRowConverter;
this.outputRowConverter = outputRowConverter;
}
@Override
- public boolean hasNext() {
- return sourceIterator.hasNext();
- }
-
- @Override
- public Row next() {
- try {
- Row row = sourceIterator.next();
- SeaTunnelRow seaTunnelRow =
inputRowConverter.unpack((GenericRowWithSchema) row);
- seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
- if (seaTunnelRow == null) {
- return null;
+ public Iterator<Row> call(Row row) throws Exception {
+ List<Row> rows = new ArrayList<>();
+
+ SeaTunnelRow seaTunnelRow =
inputRowConverter.unpack((GenericRowWithSchema) row);
+ if (transform instanceof SeaTunnelMultiRowTransform) {
+ List<SeaTunnelRow> seaTunnelRows =
+ ((SeaTunnelMultiRowTransform<SeaTunnelRow>) transform)
+ .flatMap(seaTunnelRow);
+ if (CollectionUtils.isNotEmpty(seaTunnelRows)) {
+ for (SeaTunnelRow seaTunnelRowTransform : seaTunnelRows) {
+
rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
+ }
+ }
+ } else {
+ SeaTunnelRow seaTunnelRowTransform =
transform.map(seaTunnelRow);
+ if (seaTunnelRowTransform != null) {
+ rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
}
- return outputRowConverter.parcel(seaTunnelRow);
- } catch (Exception e) {
- throw new TaskExecuteException("Row convert failed, caused: "
+ e.getMessage(), e);
}
+ return rows.iterator();
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index df404a2852..a12eabe7ef 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -59,6 +59,18 @@ public class TestSQLIT extends TestSuiteBase {
Assertions.assertEquals(0, sqlAllColumns.getExitCode());
Container.ExecResult caseWhenSql =
container.executeJob("/sql_transform/case_when.conf");
Assertions.assertEquals(0, caseWhenSql.getExitCode());
+
+ Container.ExecResult execResultBySql =
+ container.executeJob("/sql_transform/explode_transform.conf");
+ Assertions.assertEquals(0, execResultBySql.getExitCode());
+
+ Container.ExecResult execResultBySqlWithoutOuter =
+
container.executeJob("/sql_transform/explode_transform_without_outer.conf");
+ Assertions.assertEquals(0, execResultBySqlWithoutOuter.getExitCode());
+
+ Container.ExecResult execResultBySqlWithOuter =
+
container.executeJob("/sql_transform/explode_transform_with_outer.conf");
+ Assertions.assertEquals(0, execResultBySqlWithOuter.getExitCode());
}
@TestTemplate
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform.conf
new file mode 100644
index 0000000000..8df8bc5076
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform.conf
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+ parallelism = 1
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ pk_id = string
+ name = string
+ age = array<String>
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["id001;id001", "zhangsan,zhangsan",["1","1"]]
+ },
+ {
+ kind = INSERT
+ fields = ["id001", "zhangsan,zhangsan",["1"]]
+ },
+ {
+ kind = INSERT
+ fields = ["id001;id001", "zhangsan",["1"]]
+ }
+ ]
+ }
+}
+
+transform {
+ Sql {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ query = "SELECT * FROM fake LATERAL VIEW OUTER EXPLODE(SPLIT(name, ','))
as name LATERAL VIEW OUTER EXPLODE(SPLIT(pk_id, ';')) as pk_id LATERAL VIEW
OUTER EXPLODE(age) as age LATERAL VIEW EXPLODE(ARRAY(1,1)) as num"
+ }
+}
+
+sink{
+ assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 24
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 24
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = string
+ field_value = [{equals_to = id001}]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [{equals_to = zhangsan}]
+ },
+ {
+ field_name = age
+ field_type = string
+ field_value = [{equals_to = 1}]
+ },
+ {
+ field_name = num
+ field_type = string
+ field_value = [{equals_to = 1}]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_with_outer.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_with_outer.conf
new file mode 100644
index 0000000000..177cc381f0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_with_outer.conf
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+ parallelism = 1
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ pk_id = string
+ name = string
+ age = array<String>
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["id001", "zhangsan",[null,null]]
+ }
+ ]
+ }
+}
+
+transform {
+ Sql {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ query = "SELECT * FROM fake LATERAL VIEW OUTER EXPLODE(age) as age LATERAL
VIEW OUTER EXPLODE(ARRAY(null,null)) as num"
+ }
+}
+
+sink{
+ assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 4
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 4
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = string
+ field_value = [{equals_to = id001}]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [{equals_to = zhangsan}]
+ },
+ {
+ field_name = age
+ field_type = "null"
+ field_value = [
+ {rule_type = NULL}
+ ]
+ },
+ {
+ field_name = num
+ field_type = "null"
+ field_value = [
+ {rule_type = NULL}
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_without_outer.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_without_outer.conf
new file mode 100644
index 0000000000..1e35a2cc50
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_without_outer.conf
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+ parallelism = 1
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ pk_id = string
+ name = string
+ age = array<String>
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["id001", "zhangsan",[1,null]]
+ }
+ ]
+ }
+}
+
+transform {
+ Sql {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ query = "SELECT * FROM fake LATERAL VIEW EXPLODE(age) as age LATERAL VIEW
EXPLODE(ARRAY(1,1,null)) as num"
+ }
+}
+
+sink{
+ assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 2
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = string
+ field_value = [{equals_to = id001}]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [{equals_to = zhangsan}]
+ },
+ {
+ field_name = age
+ field_type = "string"
+ field_value = [{equals_to = 1}]
+ },
+ {
+ field_name = num
+ field_type = "string"
+ field_value = [{equals_to = 1}]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
index a189c7c2dd..c3c97f7f10 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
@@ -49,7 +49,7 @@ transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
- query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id
as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as
c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1,
ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1,
nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as
decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6,
cast(name as bytes) as c7, name as `apply` from fake"
+ query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id
as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as
c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1,
ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1,
nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as
decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6,
cast(name as BINARY) as c7, name as `apply` from fake"
}
}
@@ -76,7 +76,7 @@ sink {
field_name = "id3"
field_type = "double"
field_value = [
- {equals_to = 1}
+ {equals_to = 1.0}
]
},
{
@@ -175,4 +175,4 @@ sink {
]
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index 0447513b5f..093ac1cca6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -27,9 +28,12 @@ import
org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
+import org.apache.commons.collections4.CollectionUtils;
+
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -89,22 +93,60 @@ public class TransformFlowLifeCycle<T> extends
ActionFlowLifeCycle
return;
}
T inputData = (T) record.getData();
- T outputData = inputData;
- for (SeaTunnelTransform<T> t : transform) {
- outputData = t.map(inputData);
- log.debug("Transform[{}] input row {} and output row {}", t,
inputData, outputData);
- if (outputData == null) {
- log.trace("Transform[{}] filtered data row {}", t,
inputData);
- break;
+ List<T> outputDataList = transform(inputData);
+ if (!outputDataList.isEmpty()) {
+ // todo log metrics
+ for (T outputData : outputDataList) {
+ collector.collect(new Record<>(outputData));
}
-
- inputData = outputData;
}
- if (outputData != null) {
- // todo log metrics
- collector.collect(new Record<>(outputData));
+ }
+ }
+
+ public List<T> transform(T inputData) {
+ if (transform.isEmpty()) {
+ return Collections.singletonList(inputData);
+ }
+
+ List<T> dataList = new ArrayList<>();
+ dataList.add(inputData);
+
+ for (SeaTunnelTransform<T> transformer : transform) {
+ List<T> nextInputDataList = new ArrayList<>();
+ if (transformer instanceof SeaTunnelMultiRowTransform) {
+ SeaTunnelMultiRowTransform<T> transformDecorator =
+ (SeaTunnelMultiRowTransform<T>) transformer;
+ for (T data : dataList) {
+ List<T> outputDataArray = transformDecorator.flatMap(data);
+ log.debug(
+ "Transform[{}] input row {} and output row {}",
+ transformer,
+ data,
+ outputDataArray);
+ if (CollectionUtils.isNotEmpty(outputDataArray)) {
+ nextInputDataList.addAll(outputDataArray);
+ }
+ }
+ } else {
+ for (T data : dataList) {
+ T outputData = transformer.map(data);
+ log.debug(
+ "Transform[{}] input row {} and output row {}",
+ transformer,
+ data,
+ outputData);
+ if (outputData == null) {
+ log.trace("Transform[{}] filtered data row {}",
transformer, data);
+ continue;
+ }
+ nextInputDataList.add(outputData);
+ }
}
+
+ dataList = nextInputDataList;
}
+
+ return dataList;
}
@Override
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
similarity index 50%
copy from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
copy to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
index 6dfaddca00..bf7dc2ab96 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
@@ -14,24 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.seatunnel.transform.common;
-package org.apache.seatunnel.transform.sql;
-
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import java.util.List;
-public interface SQLEngine {
- void init(
- String inputTableName,
- String catalogTableName,
- SeaTunnelRowType inputRowType,
- String sql);
+@Slf4j
+public abstract class AbstractCatalogMultiRowTransform
+ extends AbstractSeaTunnelTransform<SeaTunnelRow, List<SeaTunnelRow>>
+ implements SeaTunnelMultiRowTransform<SeaTunnelRow> {
- SeaTunnelRowType typeMapping(List<String> inputColumnsMapping);
+ public AbstractCatalogMultiRowTransform(@NonNull CatalogTable
inputCatalogTable) {
+ super(inputCatalogTable);
+ }
- SeaTunnelRow transformBySQL(SeaTunnelRow inputRow);
+ public AbstractCatalogMultiRowTransform(
+ @NonNull CatalogTable inputCatalogTable, ErrorHandleWay
rowErrorHandleWay) {
+ super(inputCatalogTable, rowErrorHandleWay);
+ }
- default void close() {}
+ @Override
+ public List<SeaTunnelRow> flatMap(SeaTunnelRow row) {
+ return transform(row);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
index a0fa464af7..358bcd4298 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
@@ -18,85 +18,27 @@
package org.apache.seatunnel.transform.common;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public abstract class AbstractCatalogSupportTransform implements
SeaTunnelTransform<SeaTunnelRow> {
- protected final ErrorHandleWay rowErrorHandleWay;
- protected CatalogTable inputCatalogTable;
-
- protected volatile CatalogTable outputCatalogTable;
-
+public abstract class AbstractCatalogSupportTransform
+ extends AbstractSeaTunnelTransform<SeaTunnelRow, SeaTunnelRow>
+ implements SeaTunnelTransform<SeaTunnelRow> {
public AbstractCatalogSupportTransform(@NonNull CatalogTable
inputCatalogTable) {
- this(inputCatalogTable,
CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.defaultValue());
+ super(inputCatalogTable);
}
public AbstractCatalogSupportTransform(
@NonNull CatalogTable inputCatalogTable, ErrorHandleWay
rowErrorHandleWay) {
- this.inputCatalogTable = inputCatalogTable;
- this.rowErrorHandleWay = rowErrorHandleWay;
+ super(inputCatalogTable, rowErrorHandleWay);
}
@Override
public SeaTunnelRow map(SeaTunnelRow row) {
- try {
- return transformRow(row);
- } catch (ErrorDataTransformException e) {
- if (e.getErrorHandleWay() != null) {
- ErrorHandleWay errorHandleWay = e.getErrorHandleWay();
- if (errorHandleWay.allowSkipThisRow()) {
- log.debug("Skip row due to error", e);
- return null;
- }
- throw e;
- }
- if (rowErrorHandleWay.allowSkip()) {
- log.debug("Skip row due to error", e);
- return null;
- }
- throw e;
- }
+ return transform(row);
}
-
- /**
- * Outputs transformed row data.
- *
- * @param inputRow upstream input row data
- */
- protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);
-
- @Override
- public CatalogTable getProducedCatalogTable() {
- if (outputCatalogTable == null) {
- synchronized (this) {
- if (outputCatalogTable == null) {
- outputCatalogTable = transformCatalogTable();
- }
- }
- }
-
- return outputCatalogTable;
- }
-
- private CatalogTable transformCatalogTable() {
- TableIdentifier tableIdentifier = transformTableIdentifier();
- TableSchema tableSchema = transformTableSchema();
- return CatalogTable.of(
- tableIdentifier,
- tableSchema,
- inputCatalogTable.getOptions(),
- inputCatalogTable.getPartitionKeys(),
- inputCatalogTable.getComment());
- }
-
- protected abstract TableSchema transformTableSchema();
-
- protected abstract TableIdentifier transformTableIdentifier();
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
similarity index 89%
copy from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
copy to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index a0fa464af7..a6dc217adb 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.seatunnel.transform.common;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -28,24 +27,47 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public abstract class AbstractCatalogSupportTransform implements
SeaTunnelTransform<SeaTunnelRow> {
+public abstract class AbstractSeaTunnelTransform<T, R> implements
SeaTunnelTransform<T> {
+
protected final ErrorHandleWay rowErrorHandleWay;
protected CatalogTable inputCatalogTable;
protected volatile CatalogTable outputCatalogTable;
- public AbstractCatalogSupportTransform(@NonNull CatalogTable
inputCatalogTable) {
+ public AbstractSeaTunnelTransform(@NonNull CatalogTable inputCatalogTable)
{
this(inputCatalogTable,
CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.defaultValue());
}
- public AbstractCatalogSupportTransform(
+ public AbstractSeaTunnelTransform(
@NonNull CatalogTable inputCatalogTable, ErrorHandleWay
rowErrorHandleWay) {
this.inputCatalogTable = inputCatalogTable;
this.rowErrorHandleWay = rowErrorHandleWay;
}
- @Override
- public SeaTunnelRow map(SeaTunnelRow row) {
+ public CatalogTable getProducedCatalogTable() {
+ if (outputCatalogTable == null) {
+ synchronized (this) {
+ if (outputCatalogTable == null) {
+ outputCatalogTable = transformCatalogTable();
+ }
+ }
+ }
+
+ return outputCatalogTable;
+ }
+
+ private CatalogTable transformCatalogTable() {
+ TableIdentifier tableIdentifier = transformTableIdentifier();
+ TableSchema tableSchema = transformTableSchema();
+ return CatalogTable.of(
+ tableIdentifier,
+ tableSchema,
+ inputCatalogTable.getOptions(),
+ inputCatalogTable.getPartitionKeys(),
+ inputCatalogTable.getComment());
+ }
+
+ public R transform(SeaTunnelRow row) {
try {
return transformRow(row);
} catch (ErrorDataTransformException e) {
@@ -70,31 +92,7 @@ public abstract class AbstractCatalogSupportTransform
implements SeaTunnelTransf
*
* @param inputRow upstream input row data
*/
- protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);
-
- @Override
- public CatalogTable getProducedCatalogTable() {
- if (outputCatalogTable == null) {
- synchronized (this) {
- if (outputCatalogTable == null) {
- outputCatalogTable = transformCatalogTable();
- }
- }
- }
-
- return outputCatalogTable;
- }
-
- private CatalogTable transformCatalogTable() {
- TableIdentifier tableIdentifier = transformTableIdentifier();
- TableSchema tableSchema = transformTableSchema();
- return CatalogTable.of(
- tableIdentifier,
- tableSchema,
- inputCatalogTable.getOptions(),
- inputCatalogTable.getPartitionKeys(),
- inputCatalogTable.getComment());
- }
+ protected abstract R transformRow(SeaTunnelRow inputRow);
protected abstract TableSchema transformTableSchema();
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
index 6dfaddca00..62c25be374 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
@@ -31,7 +31,7 @@ public interface SQLEngine {
SeaTunnelRowType typeMapping(List<String> inputColumnsMapping);
- SeaTunnelRow transformBySQL(SeaTunnelRow inputRow);
+ List<SeaTunnelRow> transformBySQL(SeaTunnelRow inputRow, SeaTunnelRowType
outputRowType);
default void close() {}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index 00316bba8e..71f448500c 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -30,7 +30,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
+import org.apache.seatunnel.transform.common.AbstractCatalogMultiRowTransform;
import org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType;
import lombok.NonNull;
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
import static
org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType.ZETA;
@Slf4j
-public class SQLTransform extends AbstractCatalogSupportTransform {
+public class SQLTransform extends AbstractCatalogMultiRowTransform {
public static final String PLUGIN_NAME = "Sql";
public static final Option<String> KEY_QUERY =
@@ -60,6 +60,8 @@ public class SQLTransform extends
AbstractCatalogSupportTransform {
private final EngineType engineType;
+ private SeaTunnelRowType outRowType;
+
private transient SQLEngine sqlEngine;
private final String inputTableName;
@@ -103,16 +105,16 @@ public class SQLTransform extends
AbstractCatalogSupportTransform {
}
@Override
- protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+ protected List<SeaTunnelRow> transformRow(SeaTunnelRow inputRow) {
tryOpen();
- return sqlEngine.transformBySQL(inputRow);
+ return sqlEngine.transformBySQL(inputRow, outRowType);
}
@Override
protected TableSchema transformTableSchema() {
tryOpen();
List<String> inputColumnsMapping = new ArrayList<>();
- SeaTunnelRowType outRowType =
sqlEngine.typeMapping(inputColumnsMapping);
+ outRowType = sqlEngine.typeMapping(inputColumnsMapping);
List<String> outputColumns = Arrays.asList(outRowType.getFieldNames());
TableSchema.Builder builder = TableSchema.builder();
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 993b4e0a3c..e701e5bcbc 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -24,9 +24,12 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.sql.SQLEngine;
+import org.apache.commons.collections4.CollectionUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
@@ -35,9 +38,9 @@ import net.sf.jsqlparser.schema.Table;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.select.AllColumns;
import net.sf.jsqlparser.statement.select.FromItem;
+import net.sf.jsqlparser.statement.select.LateralView;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
-import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
import javax.annotation.Nullable;
@@ -150,12 +153,6 @@ public class ZetaSQLEngine implements SQLEngine {
if (selectBody.getLimit() != null || selectBody.getOffset() !=
null) {
throw new IllegalArgumentException("Unsupported LIMIT,OFFSET
syntax");
}
-
- // for (SelectItem selectItem : selectBody.getSelectItems()) {
- // if (selectItem instanceof AllColumns) {
- // throw new IllegalArgumentException("Unsupported all
columns select syntax");
- // }
- // }
} catch (Exception e) {
throw new TransformException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
@@ -165,7 +162,7 @@ public class ZetaSQLEngine implements SQLEngine {
@Override
public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
- List<SelectItem> selectItems = selectBody.getSelectItems();
+ List<SelectItem<?>> selectItems = selectBody.getSelectItems();
// count number of all columns
int columnsSize = countColumnsSize(selectItems);
@@ -183,7 +180,7 @@ public class ZetaSQLEngine implements SQLEngine {
int idx = 0;
for (SelectItem selectItem : selectItems) {
- if (selectItem instanceof AllColumns) {
+ if (selectItem.getExpression() instanceof AllColumns) {
for (int i = 0; i < inputRowType.getFieldNames().length; i++) {
fieldNames[idx] = inputRowType.getFieldName(i);
seaTunnelDataTypes[idx] = inputRowType.getFieldType(i);
@@ -192,11 +189,10 @@ public class ZetaSQLEngine implements SQLEngine {
}
idx++;
}
- } else if (selectItem instanceof SelectExpressionItem) {
- SelectExpressionItem expressionItem = (SelectExpressionItem)
selectItem;
- Expression expression = expressionItem.getExpression();
- if (expressionItem.getAlias() != null) {
- String aliasName = expressionItem.getAlias().getName();
+ } else {
+ Expression expression = selectItem.getExpression();
+ if (selectItem.getAlias() != null) {
+ String aliasName = selectItem.getAlias().getName();
if (aliasName.startsWith(ESCAPE_IDENTIFIER)
&& aliasName.endsWith(ESCAPE_IDENTIFIER)) {
aliasName = aliasName.substring(1, aliasName.length()
- 1);
@@ -218,15 +214,18 @@ public class ZetaSQLEngine implements SQLEngine {
seaTunnelDataTypes[idx] =
zetaSQLType.getExpressionType(expression);
idx++;
- } else {
- idx++;
}
}
- return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+ List<LateralView> lateralViews = selectBody.getLateralViews();
+ if (CollectionUtils.isEmpty(lateralViews)) {
+ return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+ }
+ return zetaSQLFunction.lateralViewMapping(
+ fieldNames, seaTunnelDataTypes, lateralViews,
inputColumnsMapping);
}
@Override
- public SeaTunnelRow transformBySQL(SeaTunnelRow inputRow) {
+ public List<SeaTunnelRow> transformBySQL(SeaTunnelRow inputRow,
SeaTunnelRowType outRowType) {
// ------Physical Query Plan Execution------
// Scan Table
Object[] inputFields = scanTable(inputRow);
@@ -243,7 +242,12 @@ public class ZetaSQLEngine implements SQLEngine {
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(outputFields);
seaTunnelRow.setRowKind(inputRow.getRowKind());
seaTunnelRow.setTableId(inputRow.getTableId());
- return seaTunnelRow;
+ List<LateralView> lateralViews = selectBody.getLateralViews();
+ if (CollectionUtils.isEmpty(lateralViews)) {
+ return Lists.newArrayList(seaTunnelRow);
+ }
+ return zetaSQLFunction.lateralView(
+ Lists.newArrayList(seaTunnelRow), lateralViews, outRowType);
}
private Object[] scanTable(SeaTunnelRow inputRow) {
@@ -252,7 +256,7 @@ public class ZetaSQLEngine implements SQLEngine {
}
private Object[] project(Object[] inputFields) {
- List<SelectItem> selectItems = selectBody.getSelectItems();
+ List<SelectItem<?>> selectItems = selectBody.getSelectItems();
int columnsSize = countColumnsSize(selectItems);
@@ -260,30 +264,27 @@ public class ZetaSQLEngine implements SQLEngine {
int idx = 0;
for (SelectItem selectItem : selectItems) {
- if (selectItem instanceof AllColumns) {
+ if (selectItem.getExpression() instanceof AllColumns) {
for (Object inputField : inputFields) {
fields[idx] = inputField;
idx++;
}
- } else if (selectItem instanceof SelectExpressionItem) {
- SelectExpressionItem expressionItem = (SelectExpressionItem)
selectItem;
- Expression expression = expressionItem.getExpression();
- fields[idx] = zetaSQLFunction.computeForValue(expression,
inputFields);
- idx++;
} else {
+ Expression expression = selectItem.getExpression();
+ fields[idx] = zetaSQLFunction.computeForValue(expression,
inputFields);
idx++;
}
}
return fields;
}
- private int countColumnsSize(List<SelectItem> selectItems) {
+ private int countColumnsSize(List<SelectItem<?>> selectItems) {
if (allColumnsCount != null) {
return allColumnsCount;
}
int allColumnsCnt = 0;
for (SelectItem selectItem : selectItems) {
- if (selectItem instanceof AllColumns) {
+ if (selectItem.getExpression() instanceof AllColumns) {
allColumnsCnt++;
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
index b3542663eb..7e84093bf4 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
@@ -30,7 +30,6 @@ import
net.sf.jsqlparser.expression.operators.conditional.AndExpression;
import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
import net.sf.jsqlparser.expression.operators.relational.ComparisonOperator;
import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
-import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
import net.sf.jsqlparser.expression.operators.relational.InExpression;
@@ -39,6 +38,7 @@ import
net.sf.jsqlparser.expression.operators.relational.LikeExpression;
import net.sf.jsqlparser.expression.operators.relational.MinorThan;
import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
+import
net.sf.jsqlparser.expression.operators.relational.ParenthesedExpressionList;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -139,7 +139,8 @@ public class ZetaSQLFilter {
private boolean inExpr(InExpression inExpression, Object[] inputFields) {
Expression leftExpr = inExpression.getLeftExpression();
- ExpressionList itemsList = (ExpressionList)
inExpression.getRightItemsList();
+ ParenthesedExpressionList<Expression> itemsList =
+ (ParenthesedExpressionList) inExpression.getRightExpression();
Object leftValue = zetaSQLFunction.computeForValue(leftExpr,
inputFields);
for (Expression exprItem : itemsList.getExpressions()) {
Object rightValue = zetaSQLFunction.computeForValue(exprItem,
inputFields);
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index ce02832712..8cbc3ed86a 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.transform.sql.zeta;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -24,12 +27,14 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.sql.zeta.functions.DateTimeFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.NumericFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.StringFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.SystemFunction;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
import net.sf.jsqlparser.expression.BinaryExpression;
@@ -45,6 +50,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
import net.sf.jsqlparser.expression.SignedExpression;
import net.sf.jsqlparser.expression.StringValue;
import net.sf.jsqlparser.expression.TimeKeyExpression;
+import net.sf.jsqlparser.expression.TrimFunction;
import net.sf.jsqlparser.expression.WhenClause;
import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
@@ -54,14 +60,17 @@ import
net.sf.jsqlparser.expression.operators.arithmetic.Multiplication;
import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
import net.sf.jsqlparser.schema.Column;
+import net.sf.jsqlparser.statement.select.LateralView;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static java.util.UUID.randomUUID;
+import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND;
public class ZetaSQLFunction {
// ============================internal functions=====================
@@ -81,6 +90,8 @@ public class ZetaSQLFunction {
public static final String INSERT = "INSERT";
public static final String LOWER = "LOWER";
public static final String LCASE = "LCASE";
+ public static final String BINARY = "BINARY";
+ public static final String BYTE = "BYTE";
public static final String UPPER = "UPPER";
public static final String UCASE = "UCASE";
public static final String LEFT = "LEFT";
@@ -104,6 +115,7 @@ public class ZetaSQLFunction {
public static final String SUBSTR = "SUBSTR";
public static final String TO_CHAR = "TO_CHAR";
public static final String TRANSLATE = "TRANSLATE";
+ public static final String SPLIT = "SPLIT";
// -------------------------numeric functions----------------------------
public static final String ABS = "ABS";
@@ -168,6 +180,10 @@ public class ZetaSQLFunction {
public static final String YEAR = "YEAR";
public static final String FROM_UNIXTIME = "FROM_UNIXTIME";
+ // -------------------------lateralView
functions----------------------------
+ public static final String EXPLODE = "EXPLODE";
+ public static final String ARRAY = "ARRAY";
+
// -------------------------system functions----------------------------
public static final String COALESCE = "COALESCE";
public static final String IFNULL = "IFNULL";
@@ -193,6 +209,18 @@ public class ZetaSQLFunction {
if (expression instanceof NullValue) {
return null;
}
+ if (expression instanceof TrimFunction) {
+ TrimFunction function = (TrimFunction) expression;
+ Column column = (Column) function.getExpression();
+ List<Object> functionArgs = new ArrayList<>();
+ if (column != null) {
+ functionArgs.add(computeForValue(column, inputFields));
+ if (function.getFromExpression() != null) {
+ functionArgs.add(((StringValue)
function.getFromExpression()).getValue());
+ }
+ }
+ return executeFunctionExpr(TRIM, functionArgs);
+ }
if (expression instanceof SignedExpression) {
SignedExpression signedExpression = (SignedExpression) expression;
if (signedExpression.getSign() == '-') {
@@ -280,7 +308,8 @@ public class ZetaSQLFunction {
}
if (expression instanceof Function) {
Function function = (Function) expression;
- ExpressionList expressionList = function.getParameters();
+ ExpressionList<Expression> expressionList =
+ (ExpressionList<Expression>) function.getParameters();
List<Object> functionArgs = new ArrayList<>();
if (expressionList != null) {
for (Expression funcArgExpression :
expressionList.getExpressions()) {
@@ -303,6 +332,7 @@ public class ZetaSQLFunction {
Parenthesis parenthesis = (Parenthesis) expression;
return computeForValue(parenthesis.getExpression(), inputFields);
}
+ // bytes not supported at the moment,use BINARY instead.
if (expression instanceof CaseExpression) {
CaseExpression caseExpression = (CaseExpression) expression;
final Object value = executeCaseExpr(caseExpression, inputFields);
@@ -412,6 +442,8 @@ public class ZetaSQLFunction {
return StringFunction.toChar(args);
case TRANSLATE:
return StringFunction.translate(args);
+ case SPLIT:
+ return StringFunction.split(args);
case ABS:
return NumericFunction.abs(args);
case ACOS:
@@ -519,6 +551,8 @@ public class ZetaSQLFunction {
return SystemFunction.ifnull(args);
case NULLIF:
return SystemFunction.nullif(args);
+ case ARRAY:
+ return SystemFunction.array(args);
case UUID:
return randomUUID().toString();
default:
@@ -551,12 +585,12 @@ public class ZetaSQLFunction {
}
public Object executeCastExpr(CastExpression castExpression, Object arg) {
- String dataType = castExpression.getType().getDataType();
+ String dataType = castExpression.getColDataType().getDataType();
List<Object> args = new ArrayList<>(2);
args.add(arg);
args.add(dataType.toUpperCase());
if (dataType.equalsIgnoreCase("DECIMAL")) {
- List<String> ps =
castExpression.getType().getArgumentsStringList();
+ List<String> ps =
castExpression.getColDataType().getArgumentsStringList();
args.add(Integer.parseInt(ps.get(0)));
args.add(Integer.parseInt(ps.get(1)));
}
@@ -665,4 +699,190 @@ public class ZetaSQLFunction {
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
String.format("Unsupported SQL Expression: %s ",
binaryExpression));
}
+
+ public List<SeaTunnelRow> lateralView(
+ List<SeaTunnelRow> seaTunnelRows,
+ List<LateralView> lateralViews,
+ SeaTunnelRowType outRowType) {
+ for (LateralView lateralView : lateralViews) {
+ Function function = lateralView.getGeneratorFunction();
+ boolean isUsingOuter = lateralView.isUsingOuter();
+ String functionName = function.getName();
+ String alias = lateralView.getColumnAlias().getName();
+ if (EXPLODE.equalsIgnoreCase(functionName)) {
+ seaTunnelRows = explode(seaTunnelRows, function, outRowType,
isUsingOuter, alias);
+ } else {
+ throw new SeaTunnelRuntimeException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ "Transform config error! UnSupport function:" +
functionName);
+ }
+ }
+
+ return seaTunnelRows;
+ }
+
+ private List<SeaTunnelRow> explode(
+ List<SeaTunnelRow> seaTunnelRows,
+ Function lateralViewFunction,
+ SeaTunnelRowType outRowType,
+ boolean isUsingOuter,
+ String alias) {
+ ExpressionList<?> expressions = lateralViewFunction.getParameters();
+ int aliasFieldIndex = outRowType.indexOf(alias);
+ for (Expression expression : expressions) {
+ if (expression instanceof Column) {
+ String column = ((Column) expression).getColumnName();
+ List<SeaTunnelRow> next = new ArrayList<>();
+ for (SeaTunnelRow row : seaTunnelRows) {
+ int fieldIndex = outRowType.indexOf(column);
+ Object splitFieldValue = row.getField(fieldIndex);
+ transformExplodeValue(
+ splitFieldValue,
+ outRowType,
+ isUsingOuter,
+ next,
+ aliasFieldIndex,
+ row,
+ expression,
+ true);
+ }
+ seaTunnelRows = next;
+ } else if (expression instanceof Function) {
+ List<SeaTunnelRow> next = new ArrayList<>();
+ for (SeaTunnelRow row : seaTunnelRows) {
+ Object splitFieldValue = computeForValue(expression,
row.getFields());
+ transformExplodeValue(
+ splitFieldValue,
+ outRowType,
+ isUsingOuter,
+ next,
+ aliasFieldIndex,
+ row,
+ expression,
+ false);
+ }
+ seaTunnelRows = next;
+ }
+ }
+ return seaTunnelRows;
+ }
+
+ private void transformExplodeValue(
+ Object splitFieldValue,
+ SeaTunnelRowType outRowType,
+ boolean isUsingOuter,
+ List<SeaTunnelRow> next,
+ int aliasFieldIndex,
+ SeaTunnelRow row,
+ Expression expression,
+ boolean keepValueType) {
+ if (splitFieldValue == null) {
+ if (isUsingOuter) {
+ next.add(
+ copySeaTunnelRowWithNewValue(
+ outRowType.getTotalFields(), row,
aliasFieldIndex, null));
+ }
+ return;
+ }
+ if (splitFieldValue.getClass().isArray()) {
+ if (ArrayUtils.isEmpty((Object[]) splitFieldValue)) {
+ if (isUsingOuter) {
+ next.add(
+ copySeaTunnelRowWithNewValue(
+ outRowType.getTotalFields(), row,
aliasFieldIndex, null));
+ }
+ return;
+ }
+ for (Object fieldValue : (Object[]) splitFieldValue) {
+
+ if (!isUsingOuter && fieldValue == null) {
+ continue;
+ }
+ Object value =
+ fieldValue == null
+ ? null
+ : (keepValueType ? fieldValue :
String.valueOf(fieldValue));
+ next.add(
+ copySeaTunnelRowWithNewValue(
+ outRowType.getTotalFields(), row,
aliasFieldIndex, value));
+ }
+ } else {
+ throw new SeaTunnelRuntimeException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ "Transform config error! UnSupport explode function:"
+ + ((Function) expression).getName());
+ }
+ }
+
+ private SeaTunnelRow copySeaTunnelRowWithNewValue(
+ int length, SeaTunnelRow row, int fieldIndex, Object fieldValue) {
+ Object[] fields = new Object[length];
+ System.arraycopy(row.getFields(), 0, fields, 0,
row.getFields().length);
+ SeaTunnelRow outputRow = new SeaTunnelRow(fields);
+ outputRow.setRowKind(row.getRowKind());
+ outputRow.setTableId(row.getTableId());
+ outputRow.setField(fieldIndex, fieldValue);
+ return outputRow;
+ }
+
+ public SeaTunnelRowType lateralViewMapping(
+ String[] fieldNames,
+ SeaTunnelDataType<?>[] seaTunnelDataTypes,
+ List<LateralView> lateralViews,
+ List<String> inputColumnsMapping) {
+ for (LateralView lateralView : lateralViews) {
+ Function function = lateralView.getGeneratorFunction();
+ String functionName = function.getName();
+ String alias = lateralView.getColumnAlias().getName();
+ if (EXPLODE.equalsIgnoreCase(functionName)) {
+ ExpressionList<?> expressions = function.getParameters();
+ int aliasIndex = Arrays.asList(fieldNames).indexOf(alias);
+ for (Expression expression : expressions) {
+ if (expression instanceof Column) {
+ String column = ((Column) expression).getColumnName();
+ int columnIndex =
Arrays.asList(fieldNames).indexOf(column);
+ if (columnIndex == -1) {
+ throw new TransformException(
+ INPUT_FIELDS_NOT_FOUND,
+ "Lateral view field must be in select
item:" + fieldNames);
+ }
+ ArrayType arrayType = (ArrayType)
seaTunnelDataTypes[columnIndex];
+ SeaTunnelDataType seaTunnelDataType =
+ PhysicalColumn.of(
+ column,
+ arrayType.getElementType(),
+ 200,
+ true,
+ "",
+ "")
+ .getDataType();
+ if (aliasIndex == -1) {
+ fieldNames = ArrayUtils.add(fieldNames, alias);
+ seaTunnelDataTypes =
+ ArrayUtils.add(seaTunnelDataTypes,
seaTunnelDataType);
+ inputColumnsMapping.add(alias);
+ } else {
+ seaTunnelDataTypes[columnIndex] =
seaTunnelDataType;
+ }
+ } else {
+ // default string type
+ SeaTunnelDataType seaTunnelDataType =
+ PhysicalColumn.of(alias,
BasicType.STRING_TYPE, 10L, true, "", "")
+ .getDataType();
+ if (aliasIndex == -1) {
+ fieldNames = ArrayUtils.add(fieldNames, alias);
+ seaTunnelDataTypes =
+ ArrayUtils.add(seaTunnelDataTypes,
seaTunnelDataType);
+ inputColumnsMapping.add(alias);
+ }
+ }
+ }
+ } else {
+ throw new SeaTunnelRuntimeException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ "Transform config error! UnSupport function:" +
functionName);
+ }
+ }
+ return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 3d9715561c..f07f27ebbb 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -43,6 +43,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
import net.sf.jsqlparser.expression.SignedExpression;
import net.sf.jsqlparser.expression.StringValue;
import net.sf.jsqlparser.expression.TimeKeyExpression;
+import net.sf.jsqlparser.expression.TrimFunction;
import net.sf.jsqlparser.expression.WhenClause;
import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
@@ -71,6 +72,7 @@ public class ZetaSQLType {
public static final String LONG = "LONG";
public static final String BYTE = "BYTE";
public static final String BYTES = "BYTES";
+ public static final String BINARY = "BINARY";
public static final String DOUBLE = "DOUBLE";
public static final String FLOAT = "FLOAT";
public static final String TIMESTAMP = "TIMESTAMP";
@@ -161,6 +163,9 @@ public class ZetaSQLType {
if (expression instanceof Function) {
return getFunctionType((Function) expression);
}
+ if (expression instanceof TrimFunction) {
+ return BasicType.STRING_TYPE;
+ }
if (expression instanceof TimeKeyExpression) {
return getTimeKeyExprType((TimeKeyExpression) expression);
}
@@ -191,6 +196,7 @@ public class ZetaSQLType {
if (expression instanceof CastExpression) {
return getCastType((CastExpression) expression);
}
+
if (expression instanceof BinaryExpression) {
BinaryExpression binaryExpression = (BinaryExpression) expression;
SeaTunnelDataType<?> leftType =
getExpressionType(binaryExpression.getLeftExpression());
@@ -314,10 +320,10 @@ public class ZetaSQLType {
}
private SeaTunnelDataType<?> getCastType(CastExpression castExpression) {
- String dataType = castExpression.getType().getDataType();
+ String dataType = castExpression.getColDataType().getDataType();
switch (dataType.toUpperCase()) {
case DECIMAL:
- List<String> ps =
castExpression.getType().getArgumentsStringList();
+ List<String> ps =
castExpression.getColDataType().getArgumentsStringList();
return new DecimalType(Integer.parseInt(ps.get(0)),
Integer.parseInt(ps.get(1)));
case VARCHAR:
case STRING:
@@ -331,6 +337,7 @@ public class ZetaSQLType {
case BYTE:
return BasicType.BYTE_TYPE;
case BYTES:
+ case BINARY:
return PrimitiveByteArrayType.INSTANCE;
case DOUBLE:
return BasicType.DOUBLE_TYPE;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/StringFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/StringFunction.java
index d665117bfd..2bf71e3b60 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/StringFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/StringFunction.java
@@ -21,6 +21,8 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.sql.zeta.ZetaSQLFunction;
+import org.apache.groovy.parser.antlr4.util.StringUtils;
+
import java.nio.charset.StandardCharsets;
import java.time.temporal.Temporal;
import java.util.Arrays;
@@ -332,6 +334,18 @@ public class StringFunction {
return trim(arg, true, true, sp);
}
+ public static String[] split(List<Object> args) {
+ String arg = (String) args.get(0);
+ if (StringUtils.isEmpty(arg)) {
+ return null;
+ }
+ String delimiter = "";
+ if (args.size() >= 2) {
+ delimiter = (String) args.get(1);
+ }
+ return arg.split(delimiter);
+ }
+
public static String trim(String s, boolean leading, boolean trailing,
String sp) {
char space = sp == null || sp.isEmpty() ? ' ' : sp.charAt(0);
int begin = 0, end = s.length();
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
index 0b616b0fbe..3968fbf2e7 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
@@ -22,6 +22,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.transform.exception.TransformException;
+import org.apache.commons.collections4.CollectionUtils;
+
+import com.google.common.collect.Lists;
+
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
@@ -66,6 +70,13 @@ public class SystemFunction {
return v1;
}
+ public static Object array(List<Object> args) {
+ if (CollectionUtils.isNotEmpty(args)) {
+ return args.toArray();
+ }
+ return Lists.newArrayList();
+ }
+
public static Object castAs(Object arg, SeaTunnelDataType<?> type) {
final ArrayList<Object> args = new ArrayList<>(4);
args.add(arg);
@@ -100,6 +111,7 @@ public class SystemFunction {
case "BYTE":
return Byte.parseByte(v1.toString());
case "BYTES":
+ case "BINARY":
return v1.toString().getBytes(StandardCharsets.UTF_8);
case "DOUBLE":
return Double.parseDouble(v1.toString());
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
index ff253eac21..fcf14cc7b9 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Objects;
public class SQLTransformTest {
@@ -168,11 +169,11 @@ public class SQLTransformTest {
"select id, trim(`apply`) as `apply` from test
where `apply` = 'a'"));
SQLTransform sqlTransform = new SQLTransform(config, table);
TableSchema tableSchema = sqlTransform.transformTableSchema();
- SeaTunnelRow result =
+ List<SeaTunnelRow> result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1),
String.valueOf("a")}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
- Assertions.assertEquals("a", result.getField(1));
+ Assertions.assertEquals("a", result.get(0).getField(1));
result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1),
String.valueOf("b")}));
@@ -191,7 +192,7 @@ public class SQLTransformTest {
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(
BasicType.STRING_TYPE,
tableSchema.getColumns().get(1).getDataType());
- Assertions.assertEquals("a", result.getField(1));
+ Assertions.assertEquals("a", result.get(0).getField(1));
table =
CatalogTableUtil.getCatalogTable(
@@ -211,7 +212,7 @@ public class SQLTransformTest {
new SeaTunnelRow(new Object[] {Integer.valueOf(1),
Long.valueOf(1)}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(BasicType.LONG_TYPE,
tableSchema.getColumns().get(1).getDataType());
- Assertions.assertEquals(Long.valueOf(2), result.getField(1));
+ Assertions.assertEquals(Long.valueOf(2), result.get(0).getField(1));
result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1),
Long.valueOf(0)}));
@@ -243,7 +244,7 @@ public class SQLTransformTest {
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(
BasicType.STRING_TYPE,
tableSchema.getColumns().get(1).getDataType());
- Assertions.assertEquals("a", result.getField(1));
+ Assertions.assertEquals("a", result.get(0).getField(1));
result =
sqlTransform.transformRow(
new SeaTunnelRow(
@@ -278,6 +279,6 @@ public class SQLTransformTest {
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(
BasicType.STRING_TYPE,
tableSchema.getColumns().get(1).getDataType());
- Assertions.assertEquals("a", result.getField(1));
+ Assertions.assertEquals("a", result.get(0).getField(1));
}
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
index 70b744a04c..c062164608 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
@@ -48,7 +48,7 @@ public class DateTimeFunctionTest {
null,
rowType,
"select from_unixtime(unixtime,'yyyy-MM-dd') as ts from test");
- SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow);
+ SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow,
rowType).get(0);
Object field = outRow.getField(0);
Assertions.assertNotNull(field.toString());
@@ -58,7 +58,7 @@ public class DateTimeFunctionTest {
null,
rowType,
"select from_unixtime(unixtime,'yyyy-MM-dd HH:mm:ss','UTC+6')
as ts from test");
- SeaTunnelRow outRow1 = sqlEngine.transformBySQL(inputRow);
+ SeaTunnelRow outRow1 = sqlEngine.transformBySQL(inputRow,
rowType).get(0);
Object field1 = outRow1.getField(0);
Assertions.assertEquals("2023-01-01 10:00:00", field1.toString());
}
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 1f49332ef9..b2012936bd 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -30,7 +30,7 @@ seatunnel-jackson-2.3.9-SNAPSHOT-optional.jar
seatunnel-guava-2.3.9-SNAPSHOT-optional.jar
seatunnel-hazelcast-shade-2.3.9-SNAPSHOT-optional.jar
slf4j-api-1.7.25.jar
-jsqlparser-4.5.jar
+jsqlparser-4.9.jar
animal-sniffer-annotations-1.17.jar
checker-qual-3.10.0.jar
error_prone_annotations-2.2.0.jar