Hisoka-X commented on code in PR #7928:
URL: https://github.com/apache/seatunnel/pull/7928#discussion_r1829201727


##########
seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf:
##########
@@ -49,7 +49,7 @@ transform {
   Sql {
     source_table_name = "fake"
     result_table_name = "fake1"
-    query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id 
as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as 
c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, 
ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, 
nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as 
decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6, 
cast(name as bytes) as c7, name as `apply` from fake"
+    query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id 
as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as 
c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, 
ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, 
nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as 
decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6, 
cast(name as BINARY) as c7, name as apply from fake"

Review Comment:
   why change this?



##########
docs/en/transform-v2/sql-functions.md:
##########
@@ -984,3 +984,16 @@ Example:
 
 select UUID() as seatunnel_uuid
 
+
+### LATERAL VIEW 
+#### EXPLODE
+
+explode array column to rows.
+OUTER EXPLODE will return NULL, while array is NULL or empty
+
+```
+SELECT * FROM fake 
+       LATERAL VIEW EXPLODE ( SPILT ( NAME, ',' ) ) AS NAME 

Review Comment:
   where is doc of `spilt` function?



##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java:
##########
@@ -193,6 +207,16 @@ public Object computeForValue(Expression expression, 
Object[] inputFields) {
         if (expression instanceof NullValue) {
             return null;
         }
+        if (expression instanceof TrimFunction) {

Review Comment:
   what's for?



##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java:
##########
@@ -665,4 +690,153 @@ private Object executeBinaryExpr(BinaryExpression 
binaryExpression, Object[] inp
                 CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                 String.format("Unsupported SQL Expression: %s ", 
binaryExpression));
     }
+
+    public List<SeaTunnelRow> lateralView(
+            List<SeaTunnelRow> seaTunnelRows,
+            List<LateralView> lateralViews,
+            SeaTunnelRowType outRowType) {
+        for (LateralView lateralView : lateralViews) {
+            Function function = lateralView.getGeneratorFunction();
+            boolean isUsingOuter = lateralView.isUsingOuter();
+            String functionName = function.getName();
+            if (EXPLODE.equalsIgnoreCase(functionName)) {
+                seaTunnelRows = explode(seaTunnelRows, function, outRowType, 
isUsingOuter);
+            } else {
+                throw new SeaTunnelRuntimeException(
+                        CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                        "Transform config error! UnSupport function:" + 
functionName);
+            }
+        }
+
+        return seaTunnelRows;
+    }
+
+    private List<SeaTunnelRow> explode(
+            List<SeaTunnelRow> seaTunnelRows,
+            Function lateralViewFunction,
+            SeaTunnelRowType outRowType,
+            boolean isUsingOuter) {
+        ExpressionList<?> expressions = lateralViewFunction.getParameters();
+        for (Expression expression : expressions) {
+            if (expression instanceof Column) {
+                String column = ((Column) expression).getColumnName();
+                List<SeaTunnelRow> next = new ArrayList<>();
+                for (SeaTunnelRow row : seaTunnelRows) {
+                    int fieldIndex = outRowType.indexOf(column);
+                    Object splitFieldValue = row.getField(fieldIndex);
+                    if (splitFieldValue == null) {
+                        continue;
+                    }
+                    if (splitFieldValue instanceof Object[]) {
+                        Object[] rowList = (Object[]) splitFieldValue;
+                        if (ArrayUtils.isEmpty(rowList) && isUsingOuter) {
+                            SeaTunnelRow outputRow = row.copy();
+                            outputRow.setField(fieldIndex, null);
+                            next.add(outputRow);
+                        } else {
+                            for (Object fieldValue : rowList) {
+                                SeaTunnelRow outputRow = row.copy();
+                                outputRow.setField(fieldIndex, fieldValue);
+                                next.add(outputRow);
+                            }
+                        }
+                    }
+                }
+                seaTunnelRows = next;
+            }
+            if (expression instanceof Function) {
+                Function function = (Function) expression;
+                String functionName = function.getName();
+                if (SPILT.equalsIgnoreCase(functionName)) {
+                    ExpressionList expressionList = function.getParameters();
+                    String column = ((Column) 
expressionList.get(0)).getColumnName();
+                    String delimiter = ((StringValue) 
expressionList.get(1)).getValue();
+                    List<SeaTunnelRow> next = new ArrayList<>();
+                    for (SeaTunnelRow row : seaTunnelRows) {
+
+                        int fieldIndex = outRowType.indexOf(column);
+                        Object splitFieldValue = row.getField(fieldIndex);
+                        if (splitFieldValue == null) {
+                            continue;
+                        }
+                        String[] splitFieldValues = 
splitFieldValue.toString().split(delimiter);
+                        for (String fieldValue : splitFieldValues) {
+                            SeaTunnelRow outputRow = row.copy();
+                            outputRow.setField(fieldIndex, fieldValue);
+                            next.add(outputRow);
+                        }
+                    }
+                    seaTunnelRows = next;
+
+                } else {
+                    throw new SeaTunnelRuntimeException(
+                            CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                            "Transform config error! UnSupport function:" + 
functionName);
+                }
+            }
+        }
+        return seaTunnelRows;
+    }
+
+    public SeaTunnelRowType lateralViewMapping(
+            String[] fieldNames,
+            SeaTunnelDataType<?>[] seaTunnelDataTypes,
+            List<LateralView> lateralViews) {
+        if (CollectionUtils.isEmpty(lateralViews)) {
+            return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+        }

Review Comment:
   I think this part should not including in `lateralViewMapping` method. Keep 
same as 
https://github.com/apache/seatunnel/pull/7928/files#diff-af2fe8799f25cd56d819141a2d8342e5ac0f2bc7e51220b8f4b3a2a0ada0c44dR241-R246



##########
seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/explode_transform_sql.conf:
##########
@@ -0,0 +1,98 @@
+#

Review Comment:
   this file useless?



##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java:
##########
@@ -665,4 +690,153 @@ private Object executeBinaryExpr(BinaryExpression 
binaryExpression, Object[] inp
                 CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                 String.format("Unsupported SQL Expression: %s ", 
binaryExpression));
     }
+
+    public List<SeaTunnelRow> lateralView(
+            List<SeaTunnelRow> seaTunnelRows,
+            List<LateralView> lateralViews,
+            SeaTunnelRowType outRowType) {
+        for (LateralView lateralView : lateralViews) {
+            Function function = lateralView.getGeneratorFunction();
+            boolean isUsingOuter = lateralView.isUsingOuter();
+            String functionName = function.getName();
+            if (EXPLODE.equalsIgnoreCase(functionName)) {
+                seaTunnelRows = explode(seaTunnelRows, function, outRowType, 
isUsingOuter);
+            } else {
+                throw new SeaTunnelRuntimeException(
+                        CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                        "Transform config error! UnSupport function:" + 
functionName);
+            }
+        }
+
+        return seaTunnelRows;
+    }
+
+    private List<SeaTunnelRow> explode(
+            List<SeaTunnelRow> seaTunnelRows,
+            Function lateralViewFunction,
+            SeaTunnelRowType outRowType,
+            boolean isUsingOuter) {
+        ExpressionList<?> expressions = lateralViewFunction.getParameters();
+        for (Expression expression : expressions) {
+            if (expression instanceof Column) {
+                String column = ((Column) expression).getColumnName();
+                List<SeaTunnelRow> next = new ArrayList<>();
+                for (SeaTunnelRow row : seaTunnelRows) {
+                    int fieldIndex = outRowType.indexOf(column);
+                    Object splitFieldValue = row.getField(fieldIndex);
+                    if (splitFieldValue == null) {
+                        continue;
+                    }
+                    if (splitFieldValue instanceof Object[]) {
+                        Object[] rowList = (Object[]) splitFieldValue;
+                        if (ArrayUtils.isEmpty(rowList) && isUsingOuter) {
+                            SeaTunnelRow outputRow = row.copy();
+                            outputRow.setField(fieldIndex, null);
+                            next.add(outputRow);
+                        } else {
+                            for (Object fieldValue : rowList) {
+                                SeaTunnelRow outputRow = row.copy();
+                                outputRow.setField(fieldIndex, fieldValue);
+                                next.add(outputRow);
+                            }
+                        }
+                    }
+                }
+                seaTunnelRows = next;
+            }
+            if (expression instanceof Function) {
+                Function function = (Function) expression;
+                String functionName = function.getName();
+                if (SPILT.equalsIgnoreCase(functionName)) {

Review Comment:
   so we can not directly use `split` function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to