This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 749826cd80 [Feature][Transform-V2][SQL] Support 'select *' and 'like' 
clause for SQL Transform plugin (#4991)
749826cd80 is described below

commit 749826cd80f3a69817bf27267392db8b7d1322a9
Author: Marvin <[email protected]>
AuthorDate: Thu Jun 29 22:37:09 2023 +0800

    [Feature][Transform-V2][SQL] Support 'select *' and 'like' clause for SQL 
Transform plugin (#4991)
    
    Co-authored-by: mcy <[email protected]>
---
 .../apache/seatunnel/e2e/transform/TestSQLIT.java  |   3 +
 .../resources/sql_transform/criteria_filter.conf   |   2 +-
 .../resources/sql_transform/sql_all_columns.conf   | 124 +++++++++++++++++++++
 .../transform/sql/zeta/ZetaSQLEngine.java          |  95 ++++++++++++----
 .../transform/sql/zeta/ZetaSQLFilter.java          |  23 +++-
 5 files changed, 222 insertions(+), 25 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index 3dc770c1d8..d54a2addaf 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -49,5 +49,8 @@ public class TestSQLIT extends TestSuiteBase {
         Container.ExecResult sqlCriteriaFilter =
                 container.executeJob("/sql_transform/criteria_filter.conf");
         Assertions.assertEquals(0, sqlCriteriaFilter.getExitCode());
+        Container.ExecResult sqlAllColumns =
+                container.executeJob("/sql_transform/sql_all_columns.conf");
+        Assertions.assertEquals(0, sqlAllColumns.getExitCode());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/criteria_filter.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/criteria_filter.conf
index a8c51a8749..c1e9ecc3b0 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/criteria_filter.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/criteria_filter.conf
@@ -47,7 +47,7 @@ transform {
   Sql {
     source_table_name = "fake"
     result_table_name = "fake1"
-    query = "select id,name,age from fake where id=1 and id!=0 and name<>'Kin 
Dom' and (age>=20 or age<22) and regexp_like(name, '[A-Z ]*') and id>0 and 
id>=1 and id in (1,2,3,4) and id not in (5,6,7) and name is not null and email 
is null and id<4 and id<=4"
+    query = "select id,name,age from fake where id=1 and id!=0 and name<>'Kin 
Dom' and (age>=20 or age<22) and regexp_like(name, '[A-Z ]*') and id>0 and 
id>=1 and id in (1,2,3,4) and id not in (5,6,7) and name is not null and email 
is null and id<4 and id<=4 and name like '%Din_'"
   }
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/sql_all_columns.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/sql_all_columns.conf
new file mode 100644
index 0000000000..0dfa9db5a7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/sql_all_columns.conf
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+        c_timestamp = "timestamp"
+        c_date = "date"
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_decimal = "decimal(30, 8)"
+      }
+    }
+  }
+}
+
+transform {
+  Sql {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    query = "select *, id as id_ from fake"
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "fake1"
+  }
+  Assert {
+      source_table_name = "fake1"
+      rules =
+        {
+          row_rules = [
+            {
+              rule_type = MIN_ROW
+              rule_value = 100
+            }
+          ]
+          field_rules = [
+            {
+              field_name = id
+              field_type = int
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            },
+            {
+              field_name = name
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            },
+            {
+              field_name = age
+              field_type = int
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            },
+            {
+              field_name = c_timestamp
+              field_type = timestamp
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            },
+            {
+              field_name = c_date
+              field_type = date
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            },
+            {
+              field_name = id_
+              field_type = int
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }
+          ]
+        }
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index fa491e7e6e..55fbe04cf1 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -54,6 +54,8 @@ public class ZetaSQLEngine implements SQLEngine {
     private ZetaSQLFilter zetaSQLFilter;
     private ZetaSQLType zetaSQLType;
 
+    private Integer allColumnsCount = null;
+
     public ZetaSQLEngine() {}
 
     @Override
@@ -131,11 +133,11 @@ public class ZetaSQLEngine implements SQLEngine {
                 throw new IllegalArgumentException("Unsupported LIMIT,OFFSET 
syntax");
             }
 
-            for (SelectItem selectItem : selectBody.getSelectItems()) {
-                if (selectItem instanceof AllColumns) {
-                    throw new IllegalArgumentException("Unsupported all 
columns select syntax");
-                }
-            }
+            // for (SelectItem selectItem : selectBody.getSelectItems()) {
+            //     if (selectItem instanceof AllColumns) {
+            //         throw new IllegalArgumentException("Unsupported all 
columns select syntax");
+            //     }
+            // }
         } catch (Exception e) {
             throw new TransformException(
                     CommonErrorCode.UNSUPPORTED_OPERATION,
@@ -147,10 +149,13 @@ public class ZetaSQLEngine implements SQLEngine {
     public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
         List<SelectItem> selectItems = selectBody.getSelectItems();
 
-        String[] fieldNames = new String[selectItems.size()];
-        SeaTunnelDataType<?>[] seaTunnelDataTypes = new 
SeaTunnelDataType<?>[selectItems.size()];
+        // count number of all columns
+        int columnsSize = countColumnsSize(selectItems);
+
+        String[] fieldNames = new String[columnsSize];
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = new 
SeaTunnelDataType<?>[columnsSize];
         if (inputColumnsMapping != null) {
-            for (int i = 0; i < selectItems.size(); i++) {
+            for (int i = 0; i < columnsSize; i++) {
                 inputColumnsMapping.add(null);
             }
         }
@@ -158,29 +163,41 @@ public class ZetaSQLEngine implements SQLEngine {
         List<String> inputColumnNames =
                 
Arrays.stream(inputRowType.getFieldNames()).collect(Collectors.toList());
 
-        for (int i = 0; i < selectItems.size(); i++) {
-            SelectItem selectItem = selectItems.get(i);
-            if (selectItem instanceof SelectExpressionItem) {
+        int idx = 0;
+        for (SelectItem selectItem : selectItems) {
+            if (selectItem instanceof AllColumns) {
+                for (int i = 0; i < inputRowType.getFieldNames().length; i++) {
+                    fieldNames[idx] = inputRowType.getFieldName(i);
+                    seaTunnelDataTypes[idx] = inputRowType.getFieldType(i);
+                    if (inputColumnsMapping != null) {
+                        inputColumnsMapping.set(idx, 
inputRowType.getFieldName(i));
+                    }
+                    idx++;
+                }
+            } else if (selectItem instanceof SelectExpressionItem) {
                 SelectExpressionItem expressionItem = (SelectExpressionItem) 
selectItem;
                 Expression expression = expressionItem.getExpression();
 
                 if (expressionItem.getAlias() != null) {
-                    fieldNames[i] = expressionItem.getAlias().getName();
+                    fieldNames[idx] = expressionItem.getAlias().getName();
                 } else {
                     if (expression instanceof Column) {
-                        fieldNames[i] = ((Column) expression).getColumnName();
+                        fieldNames[idx] = ((Column) 
expression).getColumnName();
                     } else {
-                        fieldNames[i] = expression.toString();
+                        fieldNames[idx] = expression.toString();
                     }
                 }
 
                 if (inputColumnsMapping != null
                         && expression instanceof Column
                         && inputColumnNames.contains(((Column) 
expression).getColumnName())) {
-                    inputColumnsMapping.set(i, ((Column) 
expression).getColumnName());
+                    inputColumnsMapping.set(idx, ((Column) 
expression).getColumnName());
                 }
 
-                seaTunnelDataTypes[i] = 
zetaSQLType.getExpressionType(expression);
+                seaTunnelDataTypes[idx] = 
zetaSQLType.getExpressionType(expression);
+                idx++;
+            } else {
+                idx++;
             }
         }
         return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
@@ -214,13 +231,47 @@ public class ZetaSQLEngine implements SQLEngine {
 
     private Object[] project(Object[] inputFields) {
         List<SelectItem> selectItems = selectBody.getSelectItems();
-        Object[] fields = new Object[selectItems.size()];
-        for (int i = 0; i < selectItems.size(); i++) {
-            SelectItem selectItem = selectItems.get(i);
-            SelectExpressionItem expressionItem = (SelectExpressionItem) 
selectItem;
-            Expression expression = expressionItem.getExpression();
-            fields[i] = zetaSQLFunction.computeForValue(expression, 
inputFields);
+
+        int columnsSize = countColumnsSize(selectItems);
+
+        Object[] fields = new Object[columnsSize];
+        for (int i = 0; i < columnsSize; i++) {
+            fields[i] = null;
+        }
+
+        int idx = 0;
+        for (SelectItem selectItem : selectItems) {
+            if (selectItem instanceof AllColumns) {
+                for (Object inputField : inputFields) {
+                    fields[idx] = inputField;
+                    idx++;
+                }
+            } else if (selectItem instanceof SelectExpressionItem) {
+                SelectExpressionItem expressionItem = (SelectExpressionItem) 
selectItem;
+                Expression expression = expressionItem.getExpression();
+                fields[idx] = zetaSQLFunction.computeForValue(expression, 
inputFields);
+                idx++;
+            } else {
+                idx++;
+            }
         }
         return fields;
     }
+
+    private int countColumnsSize(List<SelectItem> selectItems) {
+        if (allColumnsCount != null) {
+            return allColumnsCount;
+        }
+        int allColumnsCnt = 0;
+        for (SelectItem selectItem : selectItems) {
+            if (selectItem instanceof AllColumns) {
+                allColumnsCnt++;
+            }
+        }
+        allColumnsCount =
+                selectItems.size()
+                        + inputRowType.getFieldNames().length * allColumnsCnt
+                        - allColumnsCnt;
+        return allColumnsCount;
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
index f444764a07..77b59c51bc 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
@@ -42,6 +42,8 @@ import 
net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class ZetaSQLFilter {
     private final ZetaSQLFunction zetaSQLFunction;
@@ -153,8 +155,25 @@ public class ZetaSQLFilter {
      * @return filter result
      */
     private boolean likeExpr(LikeExpression likeExpression, Object[] 
inputFields) {
-        throw new TransformException(
-                CommonErrorCode.UNSUPPORTED_OPERATION, "Unsupported [LIKE] 
filter expression yet");
+        Expression leftExpr = likeExpression.getLeftExpression();
+        Object leftVal = zetaSQLFunction.computeForValue(leftExpr, 
inputFields);
+        if (leftVal == null) {
+            return false;
+        }
+        Expression rightExpr = likeExpression.getRightExpression();
+        Object rightVal = zetaSQLFunction.computeForValue(rightExpr, 
inputFields);
+        if (rightVal == null) {
+            return false;
+        }
+
+        String regex = rightVal.toString().replace("%", ".*").replace("_", 
".");
+        if (regex.startsWith("'") && regex.endsWith("'")) {
+            regex = regex.substring(0, regex.length() - 1).substring(1);
+        }
+        Pattern pattern = Pattern.compile(regex);
+        Matcher matcher = pattern.matcher(leftVal.toString());
+
+        return matcher.matches();
     }
 
     private Pair<Object, Object> executeComparisonOperator(

Reply via email to