This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fa11abb7c2 [Improve][SQL] Support use catalogTableName as SQL
expression (#5273)
fa11abb7c2 is described below
commit fa11abb7c2990d287bc60fc5761356352bbfa6e1
Author: Jia Fan <[email protected]>
AuthorDate: Fri Aug 11 13:49:33 2023 +0800
[Improve][SQL] Support use catalogTableName as SQL expression (#5273)
---
.../seatunnel/engine/e2e/JobExecutionIT.java | 1 +
.../apache/seatunnel/transform/sql/SQLEngine.java | 6 ++-
.../seatunnel/transform/sql/SQLTransform.java | 6 ++-
.../transform/sql/zeta/ZetaSQLEngine.java | 13 +++++-
.../transform/sql/zeta/ZetaSQLEngineTest.java | 54 ++++++++++++++++++++++
5 files changed, 76 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 501e763e3c..d0467e53cb 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -32,6 +32,7 @@ import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
index b1e734c31e..6dfaddca00 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java
@@ -23,7 +23,11 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import java.util.List;
public interface SQLEngine {
- void init(String inputTableName, SeaTunnelRowType inputRowType, String
sql);
+ void init(
+ String inputTableName,
+ String catalogTableName,
+ SeaTunnelRowType inputRowType,
+ String sql);
SeaTunnelRowType typeMapping(List<String> inputColumnsMapping);
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index 20a07dcee0..9b21c4b6f5 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -115,7 +115,11 @@ public class SQLTransform extends
AbstractCatalogSupportTransform {
@Override
public void open() {
sqlEngine = SQLEngineFactory.getSQLEngine(engineType);
- sqlEngine.init(inputTableName, inputRowType, query);
+ sqlEngine.init(
+ inputTableName,
+ inputCatalogTable != null ?
inputCatalogTable.getTableId().getTableName() : null,
+ inputRowType,
+ query);
}
private void tryOpen() {
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 55fbe04cf1..2f01fe3af9 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -37,6 +37,8 @@ import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -45,6 +47,7 @@ import java.util.stream.Collectors;
public class ZetaSQLEngine implements SQLEngine {
private String inputTableName;
+ @Nullable private String catalogTableName;
private SeaTunnelRowType inputRowType;
private String sql;
@@ -59,8 +62,13 @@ public class ZetaSQLEngine implements SQLEngine {
public ZetaSQLEngine() {}
@Override
- public void init(String inputTableName, SeaTunnelRowType inputRowType,
String sql) {
+ public void init(
+ String inputTableName,
+ String catalogTableName,
+ SeaTunnelRowType inputRowType,
+ String sql) {
this.inputTableName = inputTableName;
+ this.catalogTableName = catalogTableName;
this.inputRowType = inputRowType;
this.sql = sql;
@@ -109,7 +117,8 @@ public class ZetaSQLEngine implements SQLEngine {
throw new IllegalArgumentException("Unsupported table
alias name syntax");
}
String tableName = table.getName();
- if (!inputTableName.equalsIgnoreCase(tableName)) {
+ if (!inputTableName.equalsIgnoreCase(tableName)
+ && !tableName.equalsIgnoreCase(catalogTableName)) {
throw new IllegalArgumentException(
String.format("Table name: %s not found",
tableName));
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
new file mode 100644
index 0000000000..94e1060af8
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.transform.exception.TransformException;
+import org.apache.seatunnel.transform.sql.SQLEngine;
+import org.apache.seatunnel.transform.sql.SQLEngineFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ZetaSQLEngineTest {
+
+ @Test
+ public void testCatalogNameAndSourceTableNameBothSupport() {
+
+ SQLEngine sqlEngine =
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "age"},
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE, BasicType.STRING_TYPE,
BasicType.INT_TYPE
+ });
+ sqlEngine.init("test", null, rowType, "select * from test");
+ sqlEngine.init("test", "nameFromCatalog", rowType, "select * from
test");
+ sqlEngine.init("test", "nameFromCatalog", rowType, "select * from
nameFromCatalog");
+
+ Assertions.assertThrows(
+ TransformException.class,
+ () -> sqlEngine.init("test", "nameFromCatalog", rowType,
"select * from unknown"));
+ Assertions.assertThrows(
+ TransformException.class,
+ () -> sqlEngine.init("test", null, rowType, "select * from
unknown"));
+ }
+}