This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fa11abb7c2 [Improve][SQL] Support use catalogTableName as SQL 
expression (#5273)
fa11abb7c2 is described below

commit fa11abb7c2990d287bc60fc5761356352bbfa6e1
Author: Jia Fan <[email protected]>
AuthorDate: Fri Aug 11 13:49:33 2023 +0800

    [Improve][SQL] Support use catalogTableName as SQL expression (#5273)
---
 .../seatunnel/engine/e2e/JobExecutionIT.java       |  1 +
 .../apache/seatunnel/transform/sql/SQLEngine.java  |  6 ++-
 .../seatunnel/transform/sql/SQLTransform.java      |  6 ++-
 .../transform/sql/zeta/ZetaSQLEngine.java          | 13 +++++-
 .../transform/sql/zeta/ZetaSQLEngineTest.java      | 54 ++++++++++++++++++++++
 5 files changed, 76 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 501e763e3c..d0467e53cb 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -32,6 +32,7 @@ import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
index b1e734c31e..6dfaddca00 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
@@ -23,7 +23,11 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import java.util.List;
 
 public interface SQLEngine {
-    void init(String inputTableName, SeaTunnelRowType inputRowType, String 
sql);
+    void init(
+            String inputTableName,
+            String catalogTableName,
+            SeaTunnelRowType inputRowType,
+            String sql);
 
     SeaTunnelRowType typeMapping(List<String> inputColumnsMapping);
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index 20a07dcee0..9b21c4b6f5 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -115,7 +115,11 @@ public class SQLTransform extends 
AbstractCatalogSupportTransform {
     @Override
     public void open() {
         sqlEngine = SQLEngineFactory.getSQLEngine(engineType);
-        sqlEngine.init(inputTableName, inputRowType, query);
+        sqlEngine.init(
+                inputTableName,
+                inputCatalogTable != null ? 
inputCatalogTable.getTableId().getTableName() : null,
+                inputRowType,
+                query);
     }
 
     private void tryOpen() {
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 55fbe04cf1..2f01fe3af9 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -37,6 +37,8 @@ import net.sf.jsqlparser.statement.select.Select;
 import net.sf.jsqlparser.statement.select.SelectExpressionItem;
 import net.sf.jsqlparser.statement.select.SelectItem;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -45,6 +47,7 @@ import java.util.stream.Collectors;
 
 public class ZetaSQLEngine implements SQLEngine {
     private String inputTableName;
+    @Nullable private String catalogTableName;
     private SeaTunnelRowType inputRowType;
 
     private String sql;
@@ -59,8 +62,13 @@ public class ZetaSQLEngine implements SQLEngine {
     public ZetaSQLEngine() {}
 
     @Override
-    public void init(String inputTableName, SeaTunnelRowType inputRowType, 
String sql) {
+    public void init(
+            String inputTableName,
+            String catalogTableName,
+            SeaTunnelRowType inputRowType,
+            String sql) {
         this.inputTableName = inputTableName;
+        this.catalogTableName = catalogTableName;
         this.inputRowType = inputRowType;
         this.sql = sql;
 
@@ -109,7 +117,8 @@ public class ZetaSQLEngine implements SQLEngine {
                     throw new IllegalArgumentException("Unsupported table 
alias name syntax");
                 }
                 String tableName = table.getName();
-                if (!inputTableName.equalsIgnoreCase(tableName)) {
+                if (!inputTableName.equalsIgnoreCase(tableName)
+                        && !tableName.equalsIgnoreCase(catalogTableName)) {
                     throw new IllegalArgumentException(
                             String.format("Table name: %s not found", 
tableName));
                 }
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
new file mode 100644
index 0000000000..94e1060af8
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.transform.exception.TransformException;
+import org.apache.seatunnel.transform.sql.SQLEngine;
+import org.apache.seatunnel.transform.sql.SQLEngineFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ZetaSQLEngineTest {
+
+    @Test
+    public void testCatalogNameAndSourceTableNameBothSupport() {
+
+        SQLEngine sqlEngine = 
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "age"},
+                        new SeaTunnelDataType[] {
+                            BasicType.INT_TYPE, BasicType.STRING_TYPE, 
BasicType.INT_TYPE
+                        });
+        sqlEngine.init("test", null, rowType, "select * from test");
+        sqlEngine.init("test", "nameFromCatalog", rowType, "select * from 
test");
+        sqlEngine.init("test", "nameFromCatalog", rowType, "select * from 
nameFromCatalog");
+
+        Assertions.assertThrows(
+                TransformException.class,
+                () -> sqlEngine.init("test", "nameFromCatalog", rowType, 
"select * from unknown"));
+        Assertions.assertThrows(
+                TransformException.class,
+                () -> sqlEngine.init("test", null, rowType, "select * from 
unknown"));
+    }
+}

Reply via email to