This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 132278c06a [Feature][transform] transform support explode (#7928)
132278c06a is described below

commit 132278c06a6f2af12934b948f65f8baf4b69db79
Author: CosmosNi <[email protected]>
AuthorDate: Sat Nov 9 16:55:35 2024 +0800

    [Feature][transform] transform support explode (#7928)
    
    Co-authored-by: njh_cmss <[email protected]>
---
 docs/en/transform-v2/sql-functions.md              |  31 +++
 docs/zh/transform-v2/sql-functions.md              |  32 +++
 pom.xml                                            |   2 +-
 .../api/transform/SeaTunnelMultiRowTransform.java  |  28 ++-
 .../seatunnel/config/sql/SqlConfigBuilder.java     |   5 +-
 .../seatunnel/jdbc/source/FixedChunkSplitter.java  |   4 +-
 .../converter/SqlToPaimonPredicateConverter.java   |  15 +-
 .../flink/execution/TransformExecuteProcessor.java |  29 +++
 .../spark/execution/TransformExecuteProcessor.java |  61 +++---
 .../apache/seatunnel/e2e/transform/TestSQLIT.java  |  12 ++
 .../resources/sql_transform/explode_transform.conf | 103 ++++++++++
 .../explode_transform_with_outer.conf              |  99 +++++++++
 .../explode_transform_without_outer.conf           |  95 +++++++++
 .../test/resources/sql_transform/func_system.conf  |   6 +-
 .../server/task/flow/TransformFlowLifeCycle.java   |  66 ++++--
 .../AbstractCatalogMultiRowTransform.java}         |  33 +--
 .../common/AbstractCatalogSupportTransform.java    |  70 +------
 ...nsform.java => AbstractSeaTunnelTransform.java} |  60 +++---
 .../apache/seatunnel/transform/sql/SQLEngine.java  |   2 +-
 .../seatunnel/transform/sql/SQLTransform.java      |  12 +-
 .../transform/sql/zeta/ZetaSQLEngine.java          |  57 +++---
 .../transform/sql/zeta/ZetaSQLFilter.java          |   5 +-
 .../transform/sql/zeta/ZetaSQLFunction.java        | 226 ++++++++++++++++++++-
 .../seatunnel/transform/sql/zeta/ZetaSQLType.java  |  11 +-
 .../sql/zeta/functions/StringFunction.java         |  14 ++
 .../sql/zeta/functions/SystemFunction.java         |  12 ++
 .../seatunnel/transform/sql/SQLTransformTest.java  |  13 +-
 .../transform/sql/zeta/DateTimeFunctionTest.java   |   4 +-
 tools/dependencies/known-dependencies.txt          |   2 +-
 29 files changed, 872 insertions(+), 237 deletions(-)

diff --git a/docs/en/transform-v2/sql-functions.md 
b/docs/en/transform-v2/sql-functions.md
index ce01df937f..31a3398937 100644
--- a/docs/en/transform-v2/sql-functions.md
+++ b/docs/en/transform-v2/sql-functions.md
@@ -302,6 +302,14 @@ Example:
 
 REPLACE(NAME, ' ')
 
+### SPLIT
+
+Split a string into an array.
+
+Example:
+
+select SPLIT(test,';') as arrays
+
 ### SOUNDEX
 
 ```SOUNDEX(string)```
@@ -984,3 +992,26 @@ Example:
 
 select UUID() as seatunnel_uuid
 
+### ARRAY
+
+Generate an array.
+
+Example:
+
+select ARRAY('test1','test2','test3') as arrays
+
+
+### LATERAL VIEW 
+#### EXPLODE
+
+explode array column to rows.
+OUTER EXPLODE will return NULL, while array is NULL or empty
+EXPLODE(SPLIT(FIELD_NAME,separator))Used to split string type. The first 
parameter of SPLIT function  is the field name, the second parameter is the 
separator
+EXPLODE(ARRAY(value1,value2)) Used to custom array type.
+```
+SELECT * FROM fake 
+       LATERAL VIEW EXPLODE ( SPLIT ( NAME, ',' ) ) AS NAME 
+       LATERAL VIEW EXPLODE ( SPLIT ( pk_id, ';' ) ) AS pk_id 
+       LATERAL VIEW OUTER EXPLODE ( age ) AS age
+       LATERAL VIEW OUTER EXPLODE ( ARRAY(1,1) ) AS num
+```
diff --git a/docs/zh/transform-v2/sql-functions.md 
b/docs/zh/transform-v2/sql-functions.md
index 13dc3a9bc5..7e3f8454e1 100644
--- a/docs/zh/transform-v2/sql-functions.md
+++ b/docs/zh/transform-v2/sql-functions.md
@@ -302,6 +302,15 @@ REPEAT(NAME || ' ', 10)
 
 REPLACE(NAME, ' ')
 
+
+### SPLIT
+
+将字符串切分成数组。
+
+示例:
+
+select SPLIT(test,';') as arrays
+
 ### SOUNDEX
 
 ```SOUNDEX(string)```
@@ -975,3 +984,26 @@ case when c_string in ('c_string') then 1 else 0 end
 
 select UUID() as seatunnel_uuid
 
+
+### ARRAY
+
+生成一个数组。
+
+示例:
+
+select ARRAY('test1','test2','test3') as arrays
+
+### LATERAL VIEW
+#### EXPLODE
+
+将 array 列展开成多行。
+OUTER EXPLODE 当 array 为NULL或者为空时,返回NULL
+EXPLODE(SPLIT(FIELD_NAME,separator))用来切分字符串类型,SPLIT 第一个参数是字段名,第二个参数是分隔符
+EXPLODE(ARRAY(value1,value2)) 用于自定义数组切分,在原有基础上生成一个新的字段。
+```
+SELECT * FROM fake 
+       LATERAL VIEW EXPLODE ( SPLIT ( NAME, ',' ) ) AS NAME 
+       LATERAL VIEW EXPLODE ( SPLIT ( pk_id, ';' ) ) AS pk_id 
+       LATERAL VIEW OUTER EXPLODE ( age ) AS age
+       LATERAL VIEW OUTER EXPLODE ( ARRAY(1,1) ) AS num
+```
diff --git a/pom.xml b/pom.xml
index 872e94112b..fdb967a1d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,7 +135,7 @@
         
<maven-scm-provider-jgit.version>2.0.0</maven-scm-provider-jgit.version>
         <testcontainer.version>1.17.6</testcontainer.version>
         <spotless.version>2.29.0</spotless.version>
-        <jsqlparser.version>4.5</jsqlparser.version>
+        <jsqlparser.version>4.9</jsqlparser.version>
         <json-path.version>2.7.0</json-path.version>
         <groovy.version>4.0.16</groovy.version>
         <jetty.version>9.4.56.v20240826</jetty.version>
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
similarity index 62%
copy from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
index 6dfaddca00..1f78e8be48 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelMultiRowTransform.java
@@ -14,24 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.seatunnel.transform.sql;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+package org.apache.seatunnel.api.transform;
 
 import java.util.List;
 
-public interface SQLEngine {
-    void init(
-            String inputTableName,
-            String catalogTableName,
-            SeaTunnelRowType inputRowType,
-            String sql);
-
-    SeaTunnelRowType typeMapping(List<String> inputColumnsMapping);
+public interface SeaTunnelMultiRowTransform<T> extends SeaTunnelTransform<T> {
 
-    SeaTunnelRow transformBySQL(SeaTunnelRow inputRow);
+    /**
+     * Transform input data to {@link 
this#getProducedCatalogTable().getSeaTunnelRowType()} types
+     * data.
+     *
+     * @param row the data need be transformed.
+     * @return transformed data.
+     */
+    List<T> flatMap(T row);
 
-    default void close() {}
+    default T map(T row) {
+        throw new UnsupportedOperationException("Heads-up conversion is not 
supported");
+    }
 }
diff --git 
a/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
 
b/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
index f0d68e089b..00f6a40a43 100644
--- 
a/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
+++ 
b/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
@@ -43,7 +43,6 @@ import net.sf.jsqlparser.statement.create.table.CreateTable;
 import net.sf.jsqlparser.statement.insert.Insert;
 import net.sf.jsqlparser.statement.select.PlainSelect;
 import net.sf.jsqlparser.statement.select.Select;
-import net.sf.jsqlparser.statement.select.SelectExpressionItem;
 import net.sf.jsqlparser.statement.select.SelectItem;
 
 import java.nio.file.Files;
@@ -360,12 +359,12 @@ public class SqlConfigBuilder {
             String sourceTableName;
             String resultTableName;
             if (plainSelect.getFromItem() == null) {
-                List<SelectItem> selectItems = plainSelect.getSelectItems();
+                List<SelectItem<?>> selectItems = plainSelect.getSelectItems();
                 if (selectItems.size() != 1) {
                     throw new ParserException(
                             "Source table must be specified in SQL: " + 
insertSql);
                 }
-                SelectExpressionItem selectItem = (SelectExpressionItem) 
selectItems.get(0);
+                SelectItem<?> selectItem = selectItems.get(0);
                 Column column = (Column) selectItem.getExpression();
                 sourceTableName = column.getColumnName();
                 resultTableName = sourceTableName;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
index edeef96f0a..72a4e061ac 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
@@ -82,8 +82,8 @@ public class FixedChunkSplitter extends ChunkSplitter {
             partitionEnd = range.getRight();
         }
         if (partitionStart == null || partitionEnd == null) {
-            JdbcSourceSplit spilt = createSingleSplit(table);
-            return Collections.singletonList(spilt);
+            JdbcSourceSplit split = createSingleSplit(table);
+            return Collections.singletonList(split);
         }
 
         return createNumberColumnSplits(
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
index 212bfd6e8b..0bf47b1310 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
@@ -54,8 +54,6 @@ import net.sf.jsqlparser.statement.Statement;
 import net.sf.jsqlparser.statement.select.AllColumns;
 import net.sf.jsqlparser.statement.select.PlainSelect;
 import net.sf.jsqlparser.statement.select.Select;
-import net.sf.jsqlparser.statement.select.SelectBody;
-import net.sf.jsqlparser.statement.select.SelectExpressionItem;
 import net.sf.jsqlparser.statement.select.SelectItem;
 
 import java.math.BigDecimal;
@@ -83,7 +81,7 @@ public class SqlToPaimonPredicateConverter {
             throw new IllegalArgumentException("Only SELECT statements are 
supported.");
         }
         Select select = (Select) statement;
-        SelectBody selectBody = select.getSelectBody();
+        Select selectBody = select.getSelectBody();
         if (!(selectBody instanceof PlainSelect)) {
             throw new IllegalArgumentException("Only simple SELECT statements 
are supported.");
         }
@@ -101,18 +99,15 @@ public class SqlToPaimonPredicateConverter {
     public static int[] convertSqlSelectToPaimonProjectionIndex(
             String[] fieldNames, PlainSelect plainSelect) {
         int[] projectionIndex = null;
-        List<SelectItem> selectItems = plainSelect.getSelectItems();
+        List<SelectItem<?>> selectItems = plainSelect.getSelectItems();
 
         List<String> columnNames = new ArrayList<>();
         for (SelectItem selectItem : selectItems) {
-            if (selectItem instanceof AllColumns) {
+            if (selectItem.getExpression() instanceof AllColumns) {
                 return null;
-            } else if (selectItem instanceof SelectExpressionItem) {
-                SelectExpressionItem selectExpressionItem = 
(SelectExpressionItem) selectItem;
-                String columnName = 
selectExpressionItem.getExpression().toString();
-                columnNames.add(columnName);
             } else {
-                throw new IllegalArgumentException("Error encountered parsing 
query fields.");
+                String columnName = ((Column) 
selectItem.getExpression()).getColumnName();
+                columnNames.add(columnName);
             }
         }
 
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 00a5046e43..982179dd2e 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -25,15 +25,19 @@ import 
org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.util.Collector;
 
 import java.net.URL;
 import java.util.ArrayList;
@@ -140,6 +144,11 @@ public class TransformExecuteProcessor
 
     protected DataStream<SeaTunnelRow> flinkTransform(
             SeaTunnelTransform transform, DataStream<SeaTunnelRow> stream) {
+        if (transform instanceof SeaTunnelMultiRowTransform) {
+            return stream.flatMap(
+                    new ArrayFlatMap(transform), 
TypeInformation.of(SeaTunnelRow.class));
+        }
+
         return stream.transform(
                 String.format("%s-Transform", transform.getPluginName()),
                 TypeInformation.of(SeaTunnelRow.class),
@@ -151,4 +160,24 @@ public class TransformExecuteProcessor
                                                 
((SeaTunnelTransform<SeaTunnelRow>) transform)
                                                         .map(row))));
     }
+
+    public static class ArrayFlatMap implements FlatMapFunction<SeaTunnelRow, 
SeaTunnelRow> {
+
+        private SeaTunnelTransform transform;
+
+        public ArrayFlatMap(SeaTunnelTransform transform) {
+            this.transform = transform;
+        }
+
+        @Override
+        public void flatMap(SeaTunnelRow row, Collector<SeaTunnelRow> 
collector) {
+            List<SeaTunnelRow> rows =
+                    ((SeaTunnelMultiRowTransform<SeaTunnelRow>) 
transform).flatMap(row);
+            if (CollectionUtils.isNotEmpty(rows)) {
+                for (SeaTunnelRow rowResult : rows) {
+                    collector.collect(rowResult);
+                }
+            }
+        }
+    }
 }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 3736576817..44a60195ee 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.api.table.factory.TableTransformFactory;
 import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -35,7 +36,8 @@ import 
org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
 import 
org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter;
 import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
 
-import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
@@ -45,7 +47,6 @@ import org.apache.spark.sql.types.StructType;
 
 import lombok.extern.slf4j.Slf4j;
 
-import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -164,57 +165,49 @@ public class TransformExecuteProcessor
         SeaTunnelRowConverter inputRowConverter = new 
SeaTunnelRowConverter(inputDataType);
         SeaTunnelRowConverter outputRowConverter = new 
SeaTunnelRowConverter(outputDataTYpe);
         ExpressionEncoder<Row> encoder = RowEncoder.apply(outputSchema);
-        return stream.mapPartitions(
-                        (MapPartitionsFunction<Row, Row>)
-                                (Iterator<Row> rowIterator) ->
-                                        new TransformIterator(
-                                                rowIterator,
-                                                transform,
-                                                outputSchema,
-                                                inputRowConverter,
-                                                outputRowConverter),
+
+        return stream.flatMap(
+                        new TransformMapPartitionsFunction(
+                                transform, inputRowConverter, 
outputRowConverter),
                         encoder)
                 .filter(Objects::nonNull);
     }
 
-    private static class TransformIterator implements Iterator<Row>, 
Serializable {
-        private Iterator<Row> sourceIterator;
+    private static class TransformMapPartitionsFunction implements 
FlatMapFunction<Row, Row> {
         private SeaTunnelTransform<SeaTunnelRow> transform;
-        private StructType structType;
         private SeaTunnelRowConverter inputRowConverter;
         private SeaTunnelRowConverter outputRowConverter;
 
-        public TransformIterator(
-                Iterator<Row> sourceIterator,
+        public TransformMapPartitionsFunction(
                 SeaTunnelTransform<SeaTunnelRow> transform,
-                StructType structType,
                 SeaTunnelRowConverter inputRowConverter,
                 SeaTunnelRowConverter outputRowConverter) {
-            this.sourceIterator = sourceIterator;
             this.transform = transform;
-            this.structType = structType;
             this.inputRowConverter = inputRowConverter;
             this.outputRowConverter = outputRowConverter;
         }
 
         @Override
-        public boolean hasNext() {
-            return sourceIterator.hasNext();
-        }
-
-        @Override
-        public Row next() {
-            try {
-                Row row = sourceIterator.next();
-                SeaTunnelRow seaTunnelRow = 
inputRowConverter.unpack((GenericRowWithSchema) row);
-                seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
-                if (seaTunnelRow == null) {
-                    return null;
+        public Iterator<Row> call(Row row) throws Exception {
+            List<Row> rows = new ArrayList<>();
+
+            SeaTunnelRow seaTunnelRow = 
inputRowConverter.unpack((GenericRowWithSchema) row);
+            if (transform instanceof SeaTunnelMultiRowTransform) {
+                List<SeaTunnelRow> seaTunnelRows =
+                        ((SeaTunnelMultiRowTransform<SeaTunnelRow>) transform)
+                                .flatMap(seaTunnelRow);
+                if (CollectionUtils.isNotEmpty(seaTunnelRows)) {
+                    for (SeaTunnelRow seaTunnelRowTransform : seaTunnelRows) {
+                        
rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
+                    }
+                }
+            } else {
+                SeaTunnelRow seaTunnelRowTransform = 
transform.map(seaTunnelRow);
+                if (seaTunnelRowTransform != null) {
+                    rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
                 }
-                return outputRowConverter.parcel(seaTunnelRow);
-            } catch (Exception e) {
-                throw new TaskExecuteException("Row convert failed, caused: " 
+ e.getMessage(), e);
             }
+            return rows.iterator();
         }
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index df404a2852..a12eabe7ef 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -59,6 +59,18 @@ public class TestSQLIT extends TestSuiteBase {
         Assertions.assertEquals(0, sqlAllColumns.getExitCode());
         Container.ExecResult caseWhenSql = 
container.executeJob("/sql_transform/case_when.conf");
         Assertions.assertEquals(0, caseWhenSql.getExitCode());
+
+        Container.ExecResult execResultBySql =
+                container.executeJob("/sql_transform/explode_transform.conf");
+        Assertions.assertEquals(0, execResultBySql.getExitCode());
+
+        Container.ExecResult execResultBySqlWithoutOuter =
+                
container.executeJob("/sql_transform/explode_transform_without_outer.conf");
+        Assertions.assertEquals(0, execResultBySqlWithoutOuter.getExitCode());
+
+        Container.ExecResult execResultBySqlWithOuter =
+                
container.executeJob("/sql_transform/explode_transform_with_outer.conf");
+        Assertions.assertEquals(0, execResultBySqlWithOuter.getExitCode());
     }
 
     @TestTemplate
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform.conf
new file mode 100644
index 0000000000..8df8bc5076
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform.conf
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+  parallelism = 1
+}
+
+source {
+  FakeSource {
+      result_table_name = "fake"
+    schema = {
+      fields {
+        pk_id = string
+        name = string
+        age = array<String>
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = ["id001;id001", "zhangsan,zhangsan",["1","1"]]
+      },
+      {
+        kind = INSERT
+        fields = ["id001", "zhangsan,zhangsan",["1"]]
+      },
+      {
+        kind = INSERT
+        fields = ["id001;id001", "zhangsan",["1"]]
+      }
+    ]
+  }
+}
+
+transform {
+  Sql {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    query = "SELECT * FROM fake LATERAL VIEW OUTER EXPLODE(SPLIT(name, ',')) 
as name LATERAL VIEW OUTER EXPLODE(SPLIT(pk_id, ';')) as pk_id LATERAL VIEW 
OUTER EXPLODE(age) as age LATERAL VIEW  EXPLODE(ARRAY(1,1)) as num"
+  }
+}
+
+sink{
+  assert {
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 24
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 24
+          }
+        ],
+        field_rules = [
+        {
+          field_name = pk_id
+          field_type = string
+          field_value = [{equals_to = id001}]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [{equals_to = zhangsan}]
+        },
+        {
+          field_name = age
+          field_type = string
+          field_value = [{equals_to = 1}]
+        },
+        {
+          field_name = num
+          field_type = string
+          field_value = [{equals_to = 1}]
+        }
+        ]
+      }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_with_outer.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_with_outer.conf
new file mode 100644
index 0000000000..177cc381f0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_with_outer.conf
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+  parallelism = 1
+}
+
+source {
+  FakeSource {
+      result_table_name = "fake"
+    schema = {
+      fields {
+        pk_id = string
+        name = string
+        age = array<String>
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = ["id001", "zhangsan",[null,null]]
+      }
+    ]
+  }
+}
+
+transform {
+  Sql {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    query = "SELECT * FROM fake LATERAL VIEW OUTER EXPLODE(age) as age LATERAL 
VIEW OUTER EXPLODE(ARRAY(null,null)) as num"
+  }
+}
+
+sink{
+  assert {
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 4
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 4
+          }
+        ],
+        field_rules = [
+        {
+          field_name = pk_id
+          field_type = string
+          field_value = [{equals_to = id001}]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [{equals_to = zhangsan}]
+        },
+        {
+          field_name = age
+          field_type = "null"
+          field_value = [
+            {rule_type = NULL}
+          ]
+        },
+        {
+          field_name = num
+          field_type = "null"
+          field_value = [
+            {rule_type = NULL}
+          ]
+        }
+        ]
+      }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_without_outer.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_without_outer.conf
new file mode 100644
index 0000000000..1e35a2cc50
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/explode_transform_without_outer.conf
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+  parallelism = 1
+}
+
+source {
+  FakeSource {
+      result_table_name = "fake"
+    schema = {
+      fields {
+        pk_id = string
+        name = string
+        age = array<String>
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = ["id001", "zhangsan",[1,null]]
+      }
+    ]
+  }
+}
+
+transform {
+  Sql {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    query = "SELECT * FROM fake LATERAL VIEW  EXPLODE(age) as age LATERAL VIEW 
 EXPLODE(ARRAY(1,1,null)) as num"
+  }
+}
+
+sink{
+  assert {
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 2
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 2
+          }
+        ],
+        field_rules = [
+        {
+          field_name = pk_id
+          field_type = string
+          field_value = [{equals_to = id001}]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [{equals_to = zhangsan}]
+        },
+        {
+          field_name = age
+          field_type = "string"
+          field_value = [{equals_to = 1}]
+        },
+        {
+          field_name = num
+          field_type = "string"
+          field_value = [{equals_to = 1}]
+        }
+        ]
+      }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
index a189c7c2dd..c3c97f7f10 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
@@ -49,7 +49,7 @@ transform {
   Sql {
     source_table_name = "fake"
     result_table_name = "fake1"
-    query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id 
as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as 
c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, 
ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, 
nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as 
decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6, 
cast(name as bytes) as c7, name as `apply` from fake"
+    query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id 
as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as 
c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, 
ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, 
nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as 
decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6, 
cast(name as BINARY) as c7, name as `apply` from fake"
   }
 }
 
@@ -76,7 +76,7 @@ sink {
           field_name = "id3"
           field_type = "double"
           field_value = [
-            {equals_to = 1}
+            {equals_to = 1.0}
           ]
         },
         {
@@ -175,4 +175,4 @@ sink {
       ]
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index 0447513b5f..093ac1cca6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task.flow;
 
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -27,9 +28,12 @@ import 
org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
+import org.apache.commons.collections4.CollectionUtils;
+
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -89,22 +93,60 @@ public class TransformFlowLifeCycle<T> extends 
ActionFlowLifeCycle
                 return;
             }
             T inputData = (T) record.getData();
-            T outputData = inputData;
-            for (SeaTunnelTransform<T> t : transform) {
-                outputData = t.map(inputData);
-                log.debug("Transform[{}] input row {} and output row {}", t, 
inputData, outputData);
-                if (outputData == null) {
-                    log.trace("Transform[{}] filtered data row {}", t, 
inputData);
-                    break;
+            List<T> outputDataList = transform(inputData);
+            if (!outputDataList.isEmpty()) {
+                // todo log metrics
+                for (T outputData : outputDataList) {
+                    collector.collect(new Record<>(outputData));
                 }
-
-                inputData = outputData;
             }
-            if (outputData != null) {
-                // todo log metrics
-                collector.collect(new Record<>(outputData));
+        }
+    }
+
+    public List<T> transform(T inputData) {
+        if (transform.isEmpty()) {
+            return Collections.singletonList(inputData);
+        }
+
+        List<T> dataList = new ArrayList<>();
+        dataList.add(inputData);
+
+        for (SeaTunnelTransform<T> transformer : transform) {
+            List<T> nextInputDataList = new ArrayList<>();
+            if (transformer instanceof SeaTunnelMultiRowTransform) {
+                SeaTunnelMultiRowTransform<T> transformDecorator =
+                        (SeaTunnelMultiRowTransform<T>) transformer;
+                for (T data : dataList) {
+                    List<T> outputDataArray = transformDecorator.flatMap(data);
+                    log.debug(
+                            "Transform[{}] input row {} and output row {}",
+                            transformer,
+                            data,
+                            outputDataArray);
+                    if (CollectionUtils.isNotEmpty(outputDataArray)) {
+                        nextInputDataList.addAll(outputDataArray);
+                    }
+                }
+            } else {
+                for (T data : dataList) {
+                    T outputData = transformer.map(data);
+                    log.debug(
+                            "Transform[{}] input row {} and output row {}",
+                            transformer,
+                            data,
+                            outputData);
+                    if (outputData == null) {
+                        log.trace("Transform[{}] filtered data row {}", 
transformer, data);
+                        continue;
+                    }
+                    nextInputDataList.add(outputData);
+                }
             }
+
+            dataList = nextInputDataList;
         }
+
+        return dataList;
     }
 
     @Override
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
similarity index 50%
copy from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
copy to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
index 6dfaddca00..bf7dc2ab96 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogMultiRowTransform.java
@@ -14,24 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.seatunnel.transform.common;
 
-package org.apache.seatunnel.transform.sql;
-
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.List;
 
-public interface SQLEngine {
-    void init(
-            String inputTableName,
-            String catalogTableName,
-            SeaTunnelRowType inputRowType,
-            String sql);
+@Slf4j
+public abstract class AbstractCatalogMultiRowTransform
+        extends AbstractSeaTunnelTransform<SeaTunnelRow, List<SeaTunnelRow>>
+        implements SeaTunnelMultiRowTransform<SeaTunnelRow> {
 
-    SeaTunnelRowType typeMapping(List<String> inputColumnsMapping);
+    public AbstractCatalogMultiRowTransform(@NonNull CatalogTable 
inputCatalogTable) {
+        super(inputCatalogTable);
+    }
 
-    SeaTunnelRow transformBySQL(SeaTunnelRow inputRow);
+    public AbstractCatalogMultiRowTransform(
+            @NonNull CatalogTable inputCatalogTable, ErrorHandleWay 
rowErrorHandleWay) {
+        super(inputCatalogTable, rowErrorHandleWay);
+    }
 
-    default void close() {}
+    @Override
+    public List<SeaTunnelRow> flatMap(SeaTunnelRow row) {
+        return transform(row);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
index a0fa464af7..358bcd4298 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
@@ -18,85 +18,27 @@
 package org.apache.seatunnel.transform.common;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public abstract class AbstractCatalogSupportTransform implements 
SeaTunnelTransform<SeaTunnelRow> {
-    protected final ErrorHandleWay rowErrorHandleWay;
-    protected CatalogTable inputCatalogTable;
-
-    protected volatile CatalogTable outputCatalogTable;
-
+public abstract class AbstractCatalogSupportTransform
+        extends AbstractSeaTunnelTransform<SeaTunnelRow, SeaTunnelRow>
+        implements SeaTunnelTransform<SeaTunnelRow> {
     public AbstractCatalogSupportTransform(@NonNull CatalogTable 
inputCatalogTable) {
-        this(inputCatalogTable, 
CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.defaultValue());
+        super(inputCatalogTable);
     }
 
     public AbstractCatalogSupportTransform(
             @NonNull CatalogTable inputCatalogTable, ErrorHandleWay 
rowErrorHandleWay) {
-        this.inputCatalogTable = inputCatalogTable;
-        this.rowErrorHandleWay = rowErrorHandleWay;
+        super(inputCatalogTable, rowErrorHandleWay);
     }
 
     @Override
     public SeaTunnelRow map(SeaTunnelRow row) {
-        try {
-            return transformRow(row);
-        } catch (ErrorDataTransformException e) {
-            if (e.getErrorHandleWay() != null) {
-                ErrorHandleWay errorHandleWay = e.getErrorHandleWay();
-                if (errorHandleWay.allowSkipThisRow()) {
-                    log.debug("Skip row due to error", e);
-                    return null;
-                }
-                throw e;
-            }
-            if (rowErrorHandleWay.allowSkip()) {
-                log.debug("Skip row due to error", e);
-                return null;
-            }
-            throw e;
-        }
+        return transform(row);
     }
-
-    /**
-     * Outputs transformed row data.
-     *
-     * @param inputRow upstream input row data
-     */
-    protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);
-
-    @Override
-    public CatalogTable getProducedCatalogTable() {
-        if (outputCatalogTable == null) {
-            synchronized (this) {
-                if (outputCatalogTable == null) {
-                    outputCatalogTable = transformCatalogTable();
-                }
-            }
-        }
-
-        return outputCatalogTable;
-    }
-
-    private CatalogTable transformCatalogTable() {
-        TableIdentifier tableIdentifier = transformTableIdentifier();
-        TableSchema tableSchema = transformTableSchema();
-        return CatalogTable.of(
-                tableIdentifier,
-                tableSchema,
-                inputCatalogTable.getOptions(),
-                inputCatalogTable.getPartitionKeys(),
-                inputCatalogTable.getComment());
-    }
-
-    protected abstract TableSchema transformTableSchema();
-
-    protected abstract TableIdentifier transformTableIdentifier();
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
similarity index 89%
copy from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
copy to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index a0fa464af7..a6dc217adb 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.seatunnel.transform.common;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -28,24 +27,47 @@ import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public abstract class AbstractCatalogSupportTransform implements 
SeaTunnelTransform<SeaTunnelRow> {
+public abstract class AbstractSeaTunnelTransform<T, R> implements 
SeaTunnelTransform<T> {
+
     protected final ErrorHandleWay rowErrorHandleWay;
     protected CatalogTable inputCatalogTable;
 
     protected volatile CatalogTable outputCatalogTable;
 
-    public AbstractCatalogSupportTransform(@NonNull CatalogTable 
inputCatalogTable) {
+    public AbstractSeaTunnelTransform(@NonNull CatalogTable inputCatalogTable) 
{
         this(inputCatalogTable, 
CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.defaultValue());
     }
 
-    public AbstractCatalogSupportTransform(
+    public AbstractSeaTunnelTransform(
             @NonNull CatalogTable inputCatalogTable, ErrorHandleWay 
rowErrorHandleWay) {
         this.inputCatalogTable = inputCatalogTable;
         this.rowErrorHandleWay = rowErrorHandleWay;
     }
 
-    @Override
-    public SeaTunnelRow map(SeaTunnelRow row) {
+    public CatalogTable getProducedCatalogTable() {
+        if (outputCatalogTable == null) {
+            synchronized (this) {
+                if (outputCatalogTable == null) {
+                    outputCatalogTable = transformCatalogTable();
+                }
+            }
+        }
+
+        return outputCatalogTable;
+    }
+
+    private CatalogTable transformCatalogTable() {
+        TableIdentifier tableIdentifier = transformTableIdentifier();
+        TableSchema tableSchema = transformTableSchema();
+        return CatalogTable.of(
+                tableIdentifier,
+                tableSchema,
+                inputCatalogTable.getOptions(),
+                inputCatalogTable.getPartitionKeys(),
+                inputCatalogTable.getComment());
+    }
+
+    public R transform(SeaTunnelRow row) {
         try {
             return transformRow(row);
         } catch (ErrorDataTransformException e) {
@@ -70,31 +92,7 @@ public abstract class AbstractCatalogSupportTransform 
implements SeaTunnelTransf
      *
      * @param inputRow upstream input row data
      */
-    protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);
-
-    @Override
-    public CatalogTable getProducedCatalogTable() {
-        if (outputCatalogTable == null) {
-            synchronized (this) {
-                if (outputCatalogTable == null) {
-                    outputCatalogTable = transformCatalogTable();
-                }
-            }
-        }
-
-        return outputCatalogTable;
-    }
-
-    private CatalogTable transformCatalogTable() {
-        TableIdentifier tableIdentifier = transformTableIdentifier();
-        TableSchema tableSchema = transformTableSchema();
-        return CatalogTable.of(
-                tableIdentifier,
-                tableSchema,
-                inputCatalogTable.getOptions(),
-                inputCatalogTable.getPartitionKeys(),
-                inputCatalogTable.getComment());
-    }
+    protected abstract R transformRow(SeaTunnelRow inputRow);
 
     protected abstract TableSchema transformTableSchema();
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
index 6dfaddca00..62c25be374 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
@@ -31,7 +31,7 @@ public interface SQLEngine {
 
     SeaTunnelRowType typeMapping(List<String> inputColumnsMapping);
 
-    SeaTunnelRow transformBySQL(SeaTunnelRow inputRow);
+    List<SeaTunnelRow> transformBySQL(SeaTunnelRow inputRow, SeaTunnelRowType 
outputRowType);
 
     default void close() {}
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index 00316bba8e..71f448500c 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -30,7 +30,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
+import org.apache.seatunnel.transform.common.AbstractCatalogMultiRowTransform;
 import org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType;
 
 import lombok.NonNull;
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType.ZETA;
 
 @Slf4j
-public class SQLTransform extends AbstractCatalogSupportTransform {
+public class SQLTransform extends AbstractCatalogMultiRowTransform {
     public static final String PLUGIN_NAME = "Sql";
 
     public static final Option<String> KEY_QUERY =
@@ -60,6 +60,8 @@ public class SQLTransform extends 
AbstractCatalogSupportTransform {
 
     private final EngineType engineType;
 
+    private SeaTunnelRowType outRowType;
+
     private transient SQLEngine sqlEngine;
 
     private final String inputTableName;
@@ -103,16 +105,16 @@ public class SQLTransform extends 
AbstractCatalogSupportTransform {
     }
 
     @Override
-    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+    protected List<SeaTunnelRow> transformRow(SeaTunnelRow inputRow) {
         tryOpen();
-        return sqlEngine.transformBySQL(inputRow);
+        return sqlEngine.transformBySQL(inputRow, outRowType);
     }
 
     @Override
     protected TableSchema transformTableSchema() {
         tryOpen();
         List<String> inputColumnsMapping = new ArrayList<>();
-        SeaTunnelRowType outRowType = 
sqlEngine.typeMapping(inputColumnsMapping);
+        outRowType = sqlEngine.typeMapping(inputColumnsMapping);
         List<String> outputColumns = Arrays.asList(outRowType.getFieldNames());
 
         TableSchema.Builder builder = TableSchema.builder();
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 993b4e0a3c..e701e5bcbc 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -24,9 +24,12 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.transform.exception.TransformException;
 import org.apache.seatunnel.transform.sql.SQLEngine;
 
+import org.apache.commons.collections4.CollectionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.expression.Expression;
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
@@ -35,9 +38,9 @@ import net.sf.jsqlparser.schema.Table;
 import net.sf.jsqlparser.statement.Statement;
 import net.sf.jsqlparser.statement.select.AllColumns;
 import net.sf.jsqlparser.statement.select.FromItem;
+import net.sf.jsqlparser.statement.select.LateralView;
 import net.sf.jsqlparser.statement.select.PlainSelect;
 import net.sf.jsqlparser.statement.select.Select;
-import net.sf.jsqlparser.statement.select.SelectExpressionItem;
 import net.sf.jsqlparser.statement.select.SelectItem;
 
 import javax.annotation.Nullable;
@@ -150,12 +153,6 @@ public class ZetaSQLEngine implements SQLEngine {
             if (selectBody.getLimit() != null || selectBody.getOffset() != 
null) {
                 throw new IllegalArgumentException("Unsupported LIMIT,OFFSET 
syntax");
             }
-
-            // for (SelectItem selectItem : selectBody.getSelectItems()) {
-            //     if (selectItem instanceof AllColumns) {
-            //         throw new IllegalArgumentException("Unsupported all 
columns select syntax");
-            //     }
-            // }
         } catch (Exception e) {
             throw new TransformException(
                     CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
@@ -165,7 +162,7 @@ public class ZetaSQLEngine implements SQLEngine {
 
     @Override
     public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
-        List<SelectItem> selectItems = selectBody.getSelectItems();
+        List<SelectItem<?>> selectItems = selectBody.getSelectItems();
 
         // count number of all columns
         int columnsSize = countColumnsSize(selectItems);
@@ -183,7 +180,7 @@ public class ZetaSQLEngine implements SQLEngine {
 
         int idx = 0;
         for (SelectItem selectItem : selectItems) {
-            if (selectItem instanceof AllColumns) {
+            if (selectItem.getExpression() instanceof AllColumns) {
                 for (int i = 0; i < inputRowType.getFieldNames().length; i++) {
                     fieldNames[idx] = inputRowType.getFieldName(i);
                     seaTunnelDataTypes[idx] = inputRowType.getFieldType(i);
@@ -192,11 +189,10 @@ public class ZetaSQLEngine implements SQLEngine {
                     }
                     idx++;
                 }
-            } else if (selectItem instanceof SelectExpressionItem) {
-                SelectExpressionItem expressionItem = (SelectExpressionItem) 
selectItem;
-                Expression expression = expressionItem.getExpression();
-                if (expressionItem.getAlias() != null) {
-                    String aliasName = expressionItem.getAlias().getName();
+            } else {
+                Expression expression = selectItem.getExpression();
+                if (selectItem.getAlias() != null) {
+                    String aliasName = selectItem.getAlias().getName();
                     if (aliasName.startsWith(ESCAPE_IDENTIFIER)
                             && aliasName.endsWith(ESCAPE_IDENTIFIER)) {
                         aliasName = aliasName.substring(1, aliasName.length() 
- 1);
@@ -218,15 +214,18 @@ public class ZetaSQLEngine implements SQLEngine {
 
                 seaTunnelDataTypes[idx] = 
zetaSQLType.getExpressionType(expression);
                 idx++;
-            } else {
-                idx++;
             }
         }
-        return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+        List<LateralView> lateralViews = selectBody.getLateralViews();
+        if (CollectionUtils.isEmpty(lateralViews)) {
+            return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+        }
+        return zetaSQLFunction.lateralViewMapping(
+                fieldNames, seaTunnelDataTypes, lateralViews, 
inputColumnsMapping);
     }
 
     @Override
-    public SeaTunnelRow transformBySQL(SeaTunnelRow inputRow) {
+    public List<SeaTunnelRow> transformBySQL(SeaTunnelRow inputRow, 
SeaTunnelRowType outRowType) {
         // ------Physical Query Plan Execution------
         // Scan Table
         Object[] inputFields = scanTable(inputRow);
@@ -243,7 +242,12 @@ public class ZetaSQLEngine implements SQLEngine {
         SeaTunnelRow seaTunnelRow = new SeaTunnelRow(outputFields);
         seaTunnelRow.setRowKind(inputRow.getRowKind());
         seaTunnelRow.setTableId(inputRow.getTableId());
-        return seaTunnelRow;
+        List<LateralView> lateralViews = selectBody.getLateralViews();
+        if (CollectionUtils.isEmpty(lateralViews)) {
+            return Lists.newArrayList(seaTunnelRow);
+        }
+        return zetaSQLFunction.lateralView(
+                Lists.newArrayList(seaTunnelRow), lateralViews, outRowType);
     }
 
     private Object[] scanTable(SeaTunnelRow inputRow) {
@@ -252,7 +256,7 @@ public class ZetaSQLEngine implements SQLEngine {
     }
 
     private Object[] project(Object[] inputFields) {
-        List<SelectItem> selectItems = selectBody.getSelectItems();
+        List<SelectItem<?>> selectItems = selectBody.getSelectItems();
 
         int columnsSize = countColumnsSize(selectItems);
 
@@ -260,30 +264,27 @@ public class ZetaSQLEngine implements SQLEngine {
 
         int idx = 0;
         for (SelectItem selectItem : selectItems) {
-            if (selectItem instanceof AllColumns) {
+            if (selectItem.getExpression() instanceof AllColumns) {
                 for (Object inputField : inputFields) {
                     fields[idx] = inputField;
                     idx++;
                 }
-            } else if (selectItem instanceof SelectExpressionItem) {
-                SelectExpressionItem expressionItem = (SelectExpressionItem) 
selectItem;
-                Expression expression = expressionItem.getExpression();
-                fields[idx] = zetaSQLFunction.computeForValue(expression, 
inputFields);
-                idx++;
             } else {
+                Expression expression = selectItem.getExpression();
+                fields[idx] = zetaSQLFunction.computeForValue(expression, 
inputFields);
                 idx++;
             }
         }
         return fields;
     }
 
-    private int countColumnsSize(List<SelectItem> selectItems) {
+    private int countColumnsSize(List<SelectItem<?>> selectItems) {
         if (allColumnsCount != null) {
             return allColumnsCount;
         }
         int allColumnsCnt = 0;
         for (SelectItem selectItem : selectItems) {
-            if (selectItem instanceof AllColumns) {
+            if (selectItem.getExpression() instanceof AllColumns) {
                 allColumnsCnt++;
             }
         }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
index b3542663eb..7e84093bf4 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
@@ -30,7 +30,6 @@ import 
net.sf.jsqlparser.expression.operators.conditional.AndExpression;
 import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
 import net.sf.jsqlparser.expression.operators.relational.ComparisonOperator;
 import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
-import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
 import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
 import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
 import net.sf.jsqlparser.expression.operators.relational.InExpression;
@@ -39,6 +38,7 @@ import 
net.sf.jsqlparser.expression.operators.relational.LikeExpression;
 import net.sf.jsqlparser.expression.operators.relational.MinorThan;
 import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
 import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
+import 
net.sf.jsqlparser.expression.operators.relational.ParenthesedExpressionList;
 
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -139,7 +139,8 @@ public class ZetaSQLFilter {
 
     private boolean inExpr(InExpression inExpression, Object[] inputFields) {
         Expression leftExpr = inExpression.getLeftExpression();
-        ExpressionList itemsList = (ExpressionList) 
inExpression.getRightItemsList();
+        ParenthesedExpressionList<Expression> itemsList =
+                (ParenthesedExpressionList) inExpression.getRightExpression();
         Object leftValue = zetaSQLFunction.computeForValue(leftExpr, 
inputFields);
         for (Expression exprItem : itemsList.getExpressions()) {
             Object rightValue = zetaSQLFunction.computeForValue(exprItem, 
inputFields);
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index ce02832712..8cbc3ed86a 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.transform.sql.zeta;
 
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -24,12 +27,14 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.transform.exception.TransformException;
 import org.apache.seatunnel.transform.sql.zeta.functions.DateTimeFunction;
 import org.apache.seatunnel.transform.sql.zeta.functions.NumericFunction;
 import org.apache.seatunnel.transform.sql.zeta.functions.StringFunction;
 import org.apache.seatunnel.transform.sql.zeta.functions.SystemFunction;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.tuple.Pair;
 
 import net.sf.jsqlparser.expression.BinaryExpression;
@@ -45,6 +50,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
 import net.sf.jsqlparser.expression.SignedExpression;
 import net.sf.jsqlparser.expression.StringValue;
 import net.sf.jsqlparser.expression.TimeKeyExpression;
+import net.sf.jsqlparser.expression.TrimFunction;
 import net.sf.jsqlparser.expression.WhenClause;
 import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
 import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
@@ -54,14 +60,17 @@ import 
net.sf.jsqlparser.expression.operators.arithmetic.Multiplication;
 import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
 import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
 import net.sf.jsqlparser.schema.Column;
+import net.sf.jsqlparser.statement.select.LateralView;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
 import static java.util.UUID.randomUUID;
+import static 
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND;
 
 public class ZetaSQLFunction {
     // ============================internal functions=====================
@@ -81,6 +90,8 @@ public class ZetaSQLFunction {
     public static final String INSERT = "INSERT";
     public static final String LOWER = "LOWER";
     public static final String LCASE = "LCASE";
+    public static final String BINARY = "BINARY";
+    public static final String BYTE = "BYTE";
     public static final String UPPER = "UPPER";
     public static final String UCASE = "UCASE";
     public static final String LEFT = "LEFT";
@@ -104,6 +115,7 @@ public class ZetaSQLFunction {
     public static final String SUBSTR = "SUBSTR";
     public static final String TO_CHAR = "TO_CHAR";
     public static final String TRANSLATE = "TRANSLATE";
+    public static final String SPLIT = "SPLIT";
 
     // -------------------------numeric functions----------------------------
     public static final String ABS = "ABS";
@@ -168,6 +180,10 @@ public class ZetaSQLFunction {
     public static final String YEAR = "YEAR";
     public static final String FROM_UNIXTIME = "FROM_UNIXTIME";
 
+    // -------------------------lateralView 
functions----------------------------
+    public static final String EXPLODE = "EXPLODE";
+    public static final String ARRAY = "ARRAY";
+
     // -------------------------system functions----------------------------
     public static final String COALESCE = "COALESCE";
     public static final String IFNULL = "IFNULL";
@@ -193,6 +209,18 @@ public class ZetaSQLFunction {
         if (expression instanceof NullValue) {
             return null;
         }
+        if (expression instanceof TrimFunction) {
+            TrimFunction function = (TrimFunction) expression;
+            Column column = (Column) function.getExpression();
+            List<Object> functionArgs = new ArrayList<>();
+            if (column != null) {
+                functionArgs.add(computeForValue(column, inputFields));
+                if (function.getFromExpression() != null) {
+                    functionArgs.add(((StringValue) 
function.getFromExpression()).getValue());
+                }
+            }
+            return executeFunctionExpr(TRIM, functionArgs);
+        }
         if (expression instanceof SignedExpression) {
             SignedExpression signedExpression = (SignedExpression) expression;
             if (signedExpression.getSign() == '-') {
@@ -280,7 +308,8 @@ public class ZetaSQLFunction {
         }
         if (expression instanceof Function) {
             Function function = (Function) expression;
-            ExpressionList expressionList = function.getParameters();
+            ExpressionList<Expression> expressionList =
+                    (ExpressionList<Expression>) function.getParameters();
             List<Object> functionArgs = new ArrayList<>();
             if (expressionList != null) {
                 for (Expression funcArgExpression : 
expressionList.getExpressions()) {
@@ -303,6 +332,7 @@ public class ZetaSQLFunction {
             Parenthesis parenthesis = (Parenthesis) expression;
             return computeForValue(parenthesis.getExpression(), inputFields);
         }
+        // bytes not supported at the moment,use BINARY instead.
         if (expression instanceof CaseExpression) {
             CaseExpression caseExpression = (CaseExpression) expression;
             final Object value = executeCaseExpr(caseExpression, inputFields);
@@ -412,6 +442,8 @@ public class ZetaSQLFunction {
                 return StringFunction.toChar(args);
             case TRANSLATE:
                 return StringFunction.translate(args);
+            case SPLIT:
+                return StringFunction.split(args);
             case ABS:
                 return NumericFunction.abs(args);
             case ACOS:
@@ -519,6 +551,8 @@ public class ZetaSQLFunction {
                 return SystemFunction.ifnull(args);
             case NULLIF:
                 return SystemFunction.nullif(args);
+            case ARRAY:
+                return SystemFunction.array(args);
             case UUID:
                 return randomUUID().toString();
             default:
@@ -551,12 +585,12 @@ public class ZetaSQLFunction {
     }
 
     public Object executeCastExpr(CastExpression castExpression, Object arg) {
-        String dataType = castExpression.getType().getDataType();
+        String dataType = castExpression.getColDataType().getDataType();
         List<Object> args = new ArrayList<>(2);
         args.add(arg);
         args.add(dataType.toUpperCase());
         if (dataType.equalsIgnoreCase("DECIMAL")) {
-            List<String> ps = 
castExpression.getType().getArgumentsStringList();
+            List<String> ps = 
castExpression.getColDataType().getArgumentsStringList();
             args.add(Integer.parseInt(ps.get(0)));
             args.add(Integer.parseInt(ps.get(1)));
         }
@@ -665,4 +699,190 @@ public class ZetaSQLFunction {
                 CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                 String.format("Unsupported SQL Expression: %s ", 
binaryExpression));
     }
+
+    public List<SeaTunnelRow> lateralView(
+            List<SeaTunnelRow> seaTunnelRows,
+            List<LateralView> lateralViews,
+            SeaTunnelRowType outRowType) {
+        for (LateralView lateralView : lateralViews) {
+            Function function = lateralView.getGeneratorFunction();
+            boolean isUsingOuter = lateralView.isUsingOuter();
+            String functionName = function.getName();
+            String alias = lateralView.getColumnAlias().getName();
+            if (EXPLODE.equalsIgnoreCase(functionName)) {
+                seaTunnelRows = explode(seaTunnelRows, function, outRowType, 
isUsingOuter, alias);
+            } else {
+                throw new SeaTunnelRuntimeException(
+                        CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                        "Transform config error! UnSupport function:" + 
functionName);
+            }
+        }
+
+        return seaTunnelRows;
+    }
+
+    private List<SeaTunnelRow> explode(
+            List<SeaTunnelRow> seaTunnelRows,
+            Function lateralViewFunction,
+            SeaTunnelRowType outRowType,
+            boolean isUsingOuter,
+            String alias) {
+        ExpressionList<?> expressions = lateralViewFunction.getParameters();
+        int aliasFieldIndex = outRowType.indexOf(alias);
+        for (Expression expression : expressions) {
+            if (expression instanceof Column) {
+                String column = ((Column) expression).getColumnName();
+                List<SeaTunnelRow> next = new ArrayList<>();
+                for (SeaTunnelRow row : seaTunnelRows) {
+                    int fieldIndex = outRowType.indexOf(column);
+                    Object splitFieldValue = row.getField(fieldIndex);
+                    transformExplodeValue(
+                            splitFieldValue,
+                            outRowType,
+                            isUsingOuter,
+                            next,
+                            aliasFieldIndex,
+                            row,
+                            expression,
+                            true);
+                }
+                seaTunnelRows = next;
+            } else if (expression instanceof Function) {
+                List<SeaTunnelRow> next = new ArrayList<>();
+                for (SeaTunnelRow row : seaTunnelRows) {
+                    Object splitFieldValue = computeForValue(expression, 
row.getFields());
+                    transformExplodeValue(
+                            splitFieldValue,
+                            outRowType,
+                            isUsingOuter,
+                            next,
+                            aliasFieldIndex,
+                            row,
+                            expression,
+                            false);
+                }
+                seaTunnelRows = next;
+            }
+        }
+        return seaTunnelRows;
+    }
+
+    private void transformExplodeValue(
+            Object splitFieldValue,
+            SeaTunnelRowType outRowType,
+            boolean isUsingOuter,
+            List<SeaTunnelRow> next,
+            int aliasFieldIndex,
+            SeaTunnelRow row,
+            Expression expression,
+            boolean keepValueType) {
+        if (splitFieldValue == null) {
+            if (isUsingOuter) {
+                next.add(
+                        copySeaTunnelRowWithNewValue(
+                                outRowType.getTotalFields(), row, 
aliasFieldIndex, null));
+            }
+            return;
+        }
+        if (splitFieldValue.getClass().isArray()) {
+            if (ArrayUtils.isEmpty((Object[]) splitFieldValue)) {
+                if (isUsingOuter) {
+                    next.add(
+                            copySeaTunnelRowWithNewValue(
+                                    outRowType.getTotalFields(), row, 
aliasFieldIndex, null));
+                }
+                return;
+            }
+            for (Object fieldValue : (Object[]) splitFieldValue) {
+
+                if (!isUsingOuter && fieldValue == null) {
+                    continue;
+                }
+                Object value =
+                        fieldValue == null
+                                ? null
+                                : (keepValueType ? fieldValue : 
String.valueOf(fieldValue));
+                next.add(
+                        copySeaTunnelRowWithNewValue(
+                                outRowType.getTotalFields(), row, 
aliasFieldIndex, value));
+            }
+        } else {
+            throw new SeaTunnelRuntimeException(
+                    CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                    "Transform config error! UnSupport explode function:"
+                            + ((Function) expression).getName());
+        }
+    }
+
+    private SeaTunnelRow copySeaTunnelRowWithNewValue(
+            int length, SeaTunnelRow row, int fieldIndex, Object fieldValue) {
+        Object[] fields = new Object[length];
+        System.arraycopy(row.getFields(), 0, fields, 0, 
row.getFields().length);
+        SeaTunnelRow outputRow = new SeaTunnelRow(fields);
+        outputRow.setRowKind(row.getRowKind());
+        outputRow.setTableId(row.getTableId());
+        outputRow.setField(fieldIndex, fieldValue);
+        return outputRow;
+    }
+
+    public SeaTunnelRowType lateralViewMapping(
+            String[] fieldNames,
+            SeaTunnelDataType<?>[] seaTunnelDataTypes,
+            List<LateralView> lateralViews,
+            List<String> inputColumnsMapping) {
+        for (LateralView lateralView : lateralViews) {
+            Function function = lateralView.getGeneratorFunction();
+            String functionName = function.getName();
+            String alias = lateralView.getColumnAlias().getName();
+            if (EXPLODE.equalsIgnoreCase(functionName)) {
+                ExpressionList<?> expressions = function.getParameters();
+                int aliasIndex = Arrays.asList(fieldNames).indexOf(alias);
+                for (Expression expression : expressions) {
+                    if (expression instanceof Column) {
+                        String column = ((Column) expression).getColumnName();
+                        int columnIndex = 
Arrays.asList(fieldNames).indexOf(column);
+                        if (columnIndex == -1) {
+                            throw new TransformException(
+                                    INPUT_FIELDS_NOT_FOUND,
+                                    "Lateral view field must be in select 
item:" + fieldNames);
+                        }
+                        ArrayType arrayType = (ArrayType) 
seaTunnelDataTypes[columnIndex];
+                        SeaTunnelDataType seaTunnelDataType =
+                                PhysicalColumn.of(
+                                                column,
+                                                arrayType.getElementType(),
+                                                200,
+                                                true,
+                                                "",
+                                                "")
+                                        .getDataType();
+                        if (aliasIndex == -1) {
+                            fieldNames = ArrayUtils.add(fieldNames, alias);
+                            seaTunnelDataTypes =
+                                    ArrayUtils.add(seaTunnelDataTypes, 
seaTunnelDataType);
+                            inputColumnsMapping.add(alias);
+                        } else {
+                            seaTunnelDataTypes[columnIndex] = 
seaTunnelDataType;
+                        }
+                    } else {
+                        // default string type
+                        SeaTunnelDataType seaTunnelDataType =
+                                PhysicalColumn.of(alias, 
BasicType.STRING_TYPE, 10L, true, "", "")
+                                        .getDataType();
+                        if (aliasIndex == -1) {
+                            fieldNames = ArrayUtils.add(fieldNames, alias);
+                            seaTunnelDataTypes =
+                                    ArrayUtils.add(seaTunnelDataTypes, 
seaTunnelDataType);
+                            inputColumnsMapping.add(alias);
+                        }
+                    }
+                }
+            } else {
+                throw new SeaTunnelRuntimeException(
+                        CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                        "Transform config error! UnSupport function:" + 
functionName);
+            }
+        }
+        return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 3d9715561c..f07f27ebbb 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -43,6 +43,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
 import net.sf.jsqlparser.expression.SignedExpression;
 import net.sf.jsqlparser.expression.StringValue;
 import net.sf.jsqlparser.expression.TimeKeyExpression;
+import net.sf.jsqlparser.expression.TrimFunction;
 import net.sf.jsqlparser.expression.WhenClause;
 import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
 import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
@@ -71,6 +72,7 @@ public class ZetaSQLType {
     public static final String LONG = "LONG";
     public static final String BYTE = "BYTE";
     public static final String BYTES = "BYTES";
+    public static final String BINARY = "BINARY";
     public static final String DOUBLE = "DOUBLE";
     public static final String FLOAT = "FLOAT";
     public static final String TIMESTAMP = "TIMESTAMP";
@@ -161,6 +163,9 @@ public class ZetaSQLType {
         if (expression instanceof Function) {
             return getFunctionType((Function) expression);
         }
+        if (expression instanceof TrimFunction) {
+            return BasicType.STRING_TYPE;
+        }
         if (expression instanceof TimeKeyExpression) {
             return getTimeKeyExprType((TimeKeyExpression) expression);
         }
@@ -191,6 +196,7 @@ public class ZetaSQLType {
         if (expression instanceof CastExpression) {
             return getCastType((CastExpression) expression);
         }
+
         if (expression instanceof BinaryExpression) {
             BinaryExpression binaryExpression = (BinaryExpression) expression;
             SeaTunnelDataType<?> leftType = 
getExpressionType(binaryExpression.getLeftExpression());
@@ -314,10 +320,10 @@ public class ZetaSQLType {
     }
 
     private SeaTunnelDataType<?> getCastType(CastExpression castExpression) {
-        String dataType = castExpression.getType().getDataType();
+        String dataType = castExpression.getColDataType().getDataType();
         switch (dataType.toUpperCase()) {
             case DECIMAL:
-                List<String> ps = 
castExpression.getType().getArgumentsStringList();
+                List<String> ps = 
castExpression.getColDataType().getArgumentsStringList();
                 return new DecimalType(Integer.parseInt(ps.get(0)), 
Integer.parseInt(ps.get(1)));
             case VARCHAR:
             case STRING:
@@ -331,6 +337,7 @@ public class ZetaSQLType {
             case BYTE:
                 return BasicType.BYTE_TYPE;
             case BYTES:
+            case BINARY:
                 return PrimitiveByteArrayType.INSTANCE;
             case DOUBLE:
                 return BasicType.DOUBLE_TYPE;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/StringFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/StringFunction.java
index d665117bfd..2bf71e3b60 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/StringFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/StringFunction.java
@@ -21,6 +21,8 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.transform.exception.TransformException;
 import org.apache.seatunnel.transform.sql.zeta.ZetaSQLFunction;
 
+import org.apache.groovy.parser.antlr4.util.StringUtils;
+
 import java.nio.charset.StandardCharsets;
 import java.time.temporal.Temporal;
 import java.util.Arrays;
@@ -332,6 +334,18 @@ public class StringFunction {
         return trim(arg, true, true, sp);
     }
 
+    public static String[] split(List<Object> args) {
+        String arg = (String) args.get(0);
+        if (StringUtils.isEmpty(arg)) {
+            return null;
+        }
+        String delimiter = "";
+        if (args.size() >= 2) {
+            delimiter = (String) args.get(1);
+        }
+        return arg.split(delimiter);
+    }
+
     public static String trim(String s, boolean leading, boolean trailing, 
String sp) {
         char space = sp == null || sp.isEmpty() ? ' ' : sp.charAt(0);
         int begin = 0, end = s.length();
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
index 0b616b0fbe..3968fbf2e7 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
@@ -22,6 +22,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.transform.exception.TransformException;
 
+import org.apache.commons.collections4.CollectionUtils;
+
+import com.google.common.collect.Lists;
+
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.nio.charset.StandardCharsets;
@@ -66,6 +70,13 @@ public class SystemFunction {
         return v1;
     }
 
+    public static Object array(List<Object> args) {
+        if (CollectionUtils.isNotEmpty(args)) {
+            return args.toArray();
+        }
+        return Lists.newArrayList();
+    }
+
     public static Object castAs(Object arg, SeaTunnelDataType<?> type) {
         final ArrayList<Object> args = new ArrayList<>(4);
         args.add(arg);
@@ -100,6 +111,7 @@ public class SystemFunction {
             case "BYTE":
                 return Byte.parseByte(v1.toString());
             case "BYTES":
+            case "BINARY":
                 return v1.toString().getBytes(StandardCharsets.UTF_8);
             case "DOUBLE":
                 return Double.parseDouble(v1.toString());
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
index ff253eac21..fcf14cc7b9 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.Test;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Objects;
 
 public class SQLTransformTest {
@@ -168,11 +169,11 @@ public class SQLTransformTest {
                                 "select id, trim(`apply`) as `apply` from test 
where `apply` = 'a'"));
         SQLTransform sqlTransform = new SQLTransform(config, table);
         TableSchema tableSchema = sqlTransform.transformTableSchema();
-        SeaTunnelRow result =
+        List<SeaTunnelRow> result =
                 sqlTransform.transformRow(
                         new SeaTunnelRow(new Object[] {Integer.valueOf(1), 
String.valueOf("a")}));
         Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
-        Assertions.assertEquals("a", result.getField(1));
+        Assertions.assertEquals("a", result.get(0).getField(1));
         result =
                 sqlTransform.transformRow(
                         new SeaTunnelRow(new Object[] {Integer.valueOf(1), 
String.valueOf("b")}));
@@ -191,7 +192,7 @@ public class SQLTransformTest {
         Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
         Assertions.assertEquals(
                 BasicType.STRING_TYPE, 
tableSchema.getColumns().get(1).getDataType());
-        Assertions.assertEquals("a", result.getField(1));
+        Assertions.assertEquals("a", result.get(0).getField(1));
 
         table =
                 CatalogTableUtil.getCatalogTable(
@@ -211,7 +212,7 @@ public class SQLTransformTest {
                         new SeaTunnelRow(new Object[] {Integer.valueOf(1), 
Long.valueOf(1)}));
         Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
         Assertions.assertEquals(BasicType.LONG_TYPE, 
tableSchema.getColumns().get(1).getDataType());
-        Assertions.assertEquals(Long.valueOf(2), result.getField(1));
+        Assertions.assertEquals(Long.valueOf(2), result.get(0).getField(1));
         result =
                 sqlTransform.transformRow(
                         new SeaTunnelRow(new Object[] {Integer.valueOf(1), 
Long.valueOf(0)}));
@@ -243,7 +244,7 @@ public class SQLTransformTest {
         Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
         Assertions.assertEquals(
                 BasicType.STRING_TYPE, 
tableSchema.getColumns().get(1).getDataType());
-        Assertions.assertEquals("a", result.getField(1));
+        Assertions.assertEquals("a", result.get(0).getField(1));
         result =
                 sqlTransform.transformRow(
                         new SeaTunnelRow(
@@ -278,6 +279,6 @@ public class SQLTransformTest {
         Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
         Assertions.assertEquals(
                 BasicType.STRING_TYPE, 
tableSchema.getColumns().get(1).getDataType());
-        Assertions.assertEquals("a", result.getField(1));
+        Assertions.assertEquals("a", result.get(0).getField(1));
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
index 70b744a04c..c062164608 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
@@ -48,7 +48,7 @@ public class DateTimeFunctionTest {
                 null,
                 rowType,
                 "select from_unixtime(unixtime,'yyyy-MM-dd') as ts from test");
-        SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow);
+        SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow, 
rowType).get(0);
         Object field = outRow.getField(0);
         Assertions.assertNotNull(field.toString());
 
@@ -58,7 +58,7 @@ public class DateTimeFunctionTest {
                 null,
                 rowType,
                 "select from_unixtime(unixtime,'yyyy-MM-dd HH:mm:ss','UTC+6') 
as ts from test");
-        SeaTunnelRow outRow1 = sqlEngine.transformBySQL(inputRow);
+        SeaTunnelRow outRow1 = sqlEngine.transformBySQL(inputRow, 
rowType).get(0);
         Object field1 = outRow1.getField(0);
         Assertions.assertEquals("2023-01-01 10:00:00", field1.toString());
     }
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 1f49332ef9..b2012936bd 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -30,7 +30,7 @@ seatunnel-jackson-2.3.9-SNAPSHOT-optional.jar
 seatunnel-guava-2.3.9-SNAPSHOT-optional.jar
 seatunnel-hazelcast-shade-2.3.9-SNAPSHOT-optional.jar
 slf4j-api-1.7.25.jar
-jsqlparser-4.5.jar
+jsqlparser-4.9.jar
 animal-sniffer-annotations-1.17.jar
 checker-qual-3.10.0.jar
 error_prone_annotations-2.2.0.jar


Reply via email to