This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5a3107442a3 [feature](tvf) support query table value function (#34516)
(#34640)
5a3107442a3 is described below
commit 5a3107442a3458116a5f3f780863d51d084f079d
Author: zy-kkk <[email protected]>
AuthorDate: Fri May 10 14:29:17 2024 +0800
[feature](tvf) support query table value function (#34516) (#34640)
This PR supports a Table Value Function called `Query`. He can push a query
directly to the catalog source for execution by specifying `catalog` and
`query` without parsing by Doris. Doris only receives the results returned by
the query.
Currently only JDBC Catalog is supported.
Example:
```
Doris > desc function query('catalog' = 'mysql','query' = 'select count(*)
as cnt from test.test');
+-------+--------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------+------+------+---------+-------+
| cnt | BIGINT | Yes | true | NULL | NONE |
+-------+--------+------+------+---------+-------+
Doris > select * from query('catalog' = 'mysql','query' = 'select count(*)
as cnt from test.test');
+----------+
| cnt |
+----------+
| 30000000 |
+----------+
```
---
.../doris/catalog/BuiltinTableValuedFunctions.java | 4 +-
.../doris/datasource/jdbc/JdbcExternalCatalog.java | 45 ++++++++---
.../doris/datasource/jdbc/JdbcExternalTable.java | 30 ++-----
.../doris/datasource/jdbc/client/JdbcClient.java | 54 +++++++++++++
.../doris/datasource/jdbc/source/JdbcScanNode.java | 38 ++++++---
.../trees/expressions/functions/table/Query.java | 56 +++++++++++++
.../visitor/TableValuedFunctionVisitor.java | 5 ++
.../tablefunction/JdbcQueryTableValueFunction.java | 58 ++++++++++++++
.../tablefunction/QueryTableValueFunction.java | 91 ++++++++++++++++++++++
.../doris/tablefunction/TableValuedFunctionIf.java | 2 +
.../external_table_p0/jdbc/test_jdbc_query_tvf.out | 44 +++++++++++
.../jdbc/test_jdbc_query_tvf.groovy | 49 ++++++++++++
12 files changed, 432 insertions(+), 44 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index 9986ce71885..3becd2e102b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -29,6 +29,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
+import org.apache.doris.nereids.trees.expressions.functions.table.Query;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;
@@ -55,7 +56,8 @@ public class BuiltinTableValuedFunctions implements
FunctionHelper {
tableValued(S3.class, "s3"),
tableValued(MvInfos.class, "mv_infos"),
tableValued(Jobs.class, "jobs"),
- tableValued(Tasks.class, "tasks")
+ tableValued(Tasks.class, "tasks"),
+ tableValued(Query.class, "query")
);
public static final BuiltinTableValuedFunctions INSTANCE = new
BuiltinTableValuedFunctions();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index fd0d966dd54..1a8cde4d03b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.jdbc;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.JdbcTable;
@@ -288,6 +289,34 @@ public class JdbcExternalCatalog extends ExternalCatalog {
jdbcClient.executeStmt(stmt);
}
+ /**
+ * Get columns from query
+ *
+ * @param query, the query string
+ * @return the columns
+ */
+ public List<Column> getColumnsFromQuery(String query) {
+ makeSureInitialized();
+ return jdbcClient.getColumnsFromQuery(query);
+ }
+
+ public void configureJdbcTable(JdbcTable jdbcTable, String tableName) {
+ jdbcTable.setCatalogId(this.getId());
+ jdbcTable.setExternalTableName(tableName);
+ jdbcTable.setJdbcTypeName(this.getDatabaseTypeName());
+ jdbcTable.setJdbcUrl(this.getJdbcUrl());
+ jdbcTable.setJdbcUser(this.getJdbcUser());
+ jdbcTable.setJdbcPasswd(this.getJdbcPasswd());
+ jdbcTable.setDriverClass(this.getDriverClass());
+ jdbcTable.setDriverUrl(this.getDriverUrl());
+ jdbcTable.setResourceName(this.getResource());
+ jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
+ jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
+
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
+
jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime());
+ jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive());
+ }
+
private void testJdbcConnection(boolean isReplay) throws DdlException {
if (FeConstants.runningUnitTest) {
// skip test connection in unit test
@@ -352,19 +381,11 @@ public class JdbcExternalCatalog extends ExternalCatalog {
private JdbcTable getTestConnectionJdbcTable() throws DdlException {
JdbcTable jdbcTable = new JdbcTable(0, "test_jdbc_connection",
Lists.newArrayList(),
TableType.JDBC_EXTERNAL_TABLE);
- jdbcTable.setCatalogId(this.getId());
- jdbcTable.setJdbcTypeName(this.getDatabaseTypeName());
- jdbcTable.setJdbcUrl(this.getJdbcUrl());
- jdbcTable.setJdbcUser(this.getJdbcUser());
- jdbcTable.setJdbcPasswd(this.getJdbcPasswd());
- jdbcTable.setDriverClass(this.getDriverClass());
- jdbcTable.setDriverUrl(this.getDriverUrl());
+ this.configureJdbcTable(jdbcTable, "test_jdbc_connection");
+
+ // Special checksum computation
jdbcTable.setCheckSum(JdbcResource.computeObjectChecksum(this.getDriverUrl()));
- jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
- jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
-
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
-
jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime());
- jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive());
+
return jdbcTable;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
index 242b973b87e..b31fc5c24a9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
@@ -33,7 +33,7 @@ import java.util.List;
import java.util.Optional;
/**
- * Elasticsearch external table.
+ * Jdbc external table.
*/
public class JdbcExternalTable extends ExternalTable {
private static final Logger LOG =
LogManager.getLogger(JdbcExternalTable.class);
@@ -83,27 +83,13 @@ public class JdbcExternalTable extends ExternalTable {
JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog;
String fullDbName = this.dbName + "." + this.name;
JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema,
TableType.JDBC_EXTERNAL_TABLE);
- jdbcTable.setCatalogId(jdbcCatalog.getId());
- jdbcTable.setExternalTableName(fullDbName);
- jdbcTable.setRemoteDatabaseName(
- ((JdbcExternalCatalog)
catalog).getJdbcClient().getRemoteDatabaseName(this.dbName));
- jdbcTable.setRemoteTableName(
- ((JdbcExternalCatalog)
catalog).getJdbcClient().getRemoteTableName(this.dbName, this.name));
- jdbcTable.setRemoteColumnNames(((JdbcExternalCatalog)
catalog).getJdbcClient().getRemoteColumnNames(this.dbName,
- this.name));
- jdbcTable.setJdbcTypeName(jdbcCatalog.getDatabaseTypeName());
- jdbcTable.setJdbcUrl(jdbcCatalog.getJdbcUrl());
- jdbcTable.setJdbcUser(jdbcCatalog.getJdbcUser());
- jdbcTable.setJdbcPasswd(jdbcCatalog.getJdbcPasswd());
- jdbcTable.setDriverClass(jdbcCatalog.getDriverClass());
- jdbcTable.setDriverUrl(jdbcCatalog.getDriverUrl());
- jdbcTable.setResourceName(jdbcCatalog.getResource());
- jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
-
jdbcTable.setConnectionPoolMinSize(jdbcCatalog.getConnectionPoolMinSize());
-
jdbcTable.setConnectionPoolMaxSize(jdbcCatalog.getConnectionPoolMaxSize());
-
jdbcTable.setConnectionPoolMaxLifeTime(jdbcCatalog.getConnectionPoolMaxLifeTime());
-
jdbcTable.setConnectionPoolMaxWaitTime(jdbcCatalog.getConnectionPoolMaxWaitTime());
-
jdbcTable.setConnectionPoolKeepAlive(jdbcCatalog.isConnectionPoolKeepAlive());
+ jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName);
+
+ // Set remote properties
+
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName));
+
jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName,
this.name));
+
jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName,
this.name));
+
return jdbcTable;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
index 05346a8db9e..604e54277f1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
@@ -38,7 +38,9 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
@@ -213,6 +215,58 @@ public abstract class JdbcClient {
}
}
+ /**
+ * Execute query via jdbc
+ *
+ * @param query, the query string
+ * @return List<Column>
+ */
+ public List<Column> getColumnsFromQuery(String query) {
+ Connection conn = getConnection();
+ List<Column> columns = Lists.newArrayList();
+ try {
+ PreparedStatement pstmt = conn.prepareStatement(query);
+ ResultSetMetaData metaData = pstmt.getMetaData();
+ if (metaData == null) {
+ throw new JdbcClientException("Query not supported: Failed to
get ResultSetMetaData from query: %s",
+ query);
+ } else {
+ List<JdbcFieldSchema> schemas =
getSchemaFromResultSetMetaData(metaData);
+ for (JdbcFieldSchema schema : schemas) {
+ columns.add(new Column(schema.getColumnName(),
jdbcTypeToDoris(schema), true, null, true, null,
+ true, -1));
+ }
+ }
+ } catch (SQLException e) {
+ throw new JdbcClientException("Failed to get columns from query:
%s", e, query);
+ } finally {
+ close(conn);
+ }
+ return columns;
+ }
+
+
+ /**
+ * Get schema from ResultSetMetaData
+ *
+ * @param metaData, the ResultSetMetaData
+ * @return List<JdbcFieldSchema>
+ */
+ public List<JdbcFieldSchema>
getSchemaFromResultSetMetaData(ResultSetMetaData metaData) throws SQLException {
+ List<JdbcFieldSchema> schemas = Lists.newArrayList();
+ for (int i = 1; i <= metaData.getColumnCount(); i++) {
+ JdbcFieldSchema field = new JdbcFieldSchema();
+ field.setColumnName(metaData.getColumnName(i));
+ field.setDataType(metaData.getColumnType(i));
+ field.setDataTypeName(metaData.getColumnTypeName(i));
+ field.setColumnSize(metaData.getColumnDisplaySize(i));
+ field.setDecimalDigits(metaData.getScale(i));
+ field.setNumPrecRadix(metaData.getPrecision(i));
+ schemas.add(field);
+ }
+ return schemas;
+ }
+
// This part used to process meta-information of database, table and
column.
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
index f7fccf6d717..58ab0f9d226 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
@@ -69,6 +69,8 @@ public class JdbcScanNode extends ExternalScanNode {
private String tableName;
private TOdbcTableType jdbcType;
private String graphQueryString = "";
+ private boolean isTableValuedFunction = false;
+ private String query = "";
private JdbcTable tbl;
@@ -84,6 +86,15 @@ public class JdbcScanNode extends ExternalScanNode {
tableName = tbl.getProperRemoteFullTableName(jdbcType);
}
+ public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean
isTableValuedFunction, String query) {
+ super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE, false);
+ this.isTableValuedFunction = isTableValuedFunction;
+ this.query = query;
+ tbl = (JdbcTable) desc.getTable();
+ jdbcType = tbl.getJdbcTableType();
+ tableName = tbl.getExternalTableName();
+ }
+
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
@@ -232,14 +243,19 @@ public class JdbcScanNode extends ExternalScanNode {
@Override
public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
StringBuilder output = new StringBuilder();
- output.append(prefix).append("TABLE: ").append(tableName).append("\n");
- if (detailLevel == TExplainLevel.BRIEF) {
- return output.toString();
- }
- output.append(prefix).append("QUERY:
").append(getJdbcQueryStr()).append("\n");
- if (!conjuncts.isEmpty()) {
- Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
- output.append(prefix).append("PREDICATES:
").append(expr.toSql()).append("\n");
+ if (isTableValuedFunction) {
+ output.append(prefix).append("TABLE VALUE FUNCTION\n");
+ output.append(prefix).append("QUERY: ").append(query).append("\n");
+ } else {
+ output.append(prefix).append("TABLE:
").append(tableName).append("\n");
+ if (detailLevel == TExplainLevel.BRIEF) {
+ return output.toString();
+ }
+ output.append(prefix).append("QUERY:
").append(getJdbcQueryStr()).append("\n");
+ if (!conjuncts.isEmpty()) {
+ Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
+ output.append(prefix).append("PREDICATES:
").append(expr.toSql()).append("\n");
+ }
}
return output.toString();
}
@@ -286,7 +302,11 @@ public class JdbcScanNode extends ExternalScanNode {
msg.jdbc_scan_node = new TJdbcScanNode();
msg.jdbc_scan_node.setTupleId(desc.getId().asInt());
msg.jdbc_scan_node.setTableName(tableName);
- msg.jdbc_scan_node.setQueryString(getJdbcQueryStr());
+ if (isTableValuedFunction) {
+ msg.jdbc_scan_node.setQueryString(query);
+ } else {
+ msg.jdbc_scan_node.setQueryString(getJdbcQueryStr());
+ }
msg.jdbc_scan_node.setTableType(jdbcType);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Query.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Query.java
new file mode 100644
index 00000000000..4c379c7e46d
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Query.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.table;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.tablefunction.QueryTableValueFunction;
+import org.apache.doris.tablefunction.TableValuedFunctionIf;
+
+import java.util.Map;
+
+/** query */
+public class Query extends TableValuedFunction {
+ public Query(Properties properties) {
+ super("query", properties);
+ }
+
+ @Override
+ public FunctionSignature customSignature() {
+ return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX,
getArgumentsTypes());
+ }
+
+ @Override
+ protected TableValuedFunctionIf toCatalogFunction() {
+ try {
+ Map<String, String> arguments = getTVFProperties().getMap();
+ return
QueryTableValueFunction.createQueryTableValueFunction(arguments);
+ } catch (Throwable t) {
+ throw new AnalysisException("Can not build
QueryTableValuedFunction by "
+ + this + ": " + t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+ return visitor.visitQuery(this, context);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index d0c76d143a2..bd09a81b011 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -29,6 +29,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
+import org.apache.doris.nereids.trees.expressions.functions.table.Query;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;
@@ -92,4 +93,8 @@ public interface TableValuedFunctionVisitor<R, C> {
default R visitS3(S3 s3, C context) {
return visitTableValuedFunction(s3, context);
}
+
+ default R visitQuery(Query query, C context) {
+ return visitTableValuedFunction(query, context);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
new file mode 100644
index 00000000000..b884dab3882
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.JdbcTable;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
+import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public class JdbcQueryTableValueFunction extends QueryTableValueFunction {
+ public static final Logger LOG =
LogManager.getLogger(JdbcQueryTableValueFunction.class);
+
+ public JdbcQueryTableValueFunction(Map<String, String> params) throws
AnalysisException {
+ super(params);
+ }
+
+ @Override
+ public List<Column> getTableColumns() throws AnalysisException {
+ JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf;
+ return catalog.getColumnsFromQuery(query);
+ }
+
+ @Override
+ public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+ JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf;
+ JdbcTable jdbcTable = new JdbcTable(1, desc.getTable().getName(),
desc.getTable().getFullSchema(),
+ TableType.JDBC);
+ catalog.configureJdbcTable(jdbcTable, desc.getTable().getName());
+ desc.setTable(jdbcTable);
+ return new JdbcScanNode(id, desc, true, query);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
new file mode 100644
index 00000000000..3865d0b25c6
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class QueryTableValueFunction extends TableValuedFunctionIf {
+ public static final Logger LOG =
LogManager.getLogger(QueryTableValueFunction.class);
+ public static final String NAME = "query";
+ private static final String CATALOG = "catalog";
+ private static final String QUERY = "query";
+ protected CatalogIf catalogIf;
+ protected final String query;
+
+ public QueryTableValueFunction(Map<String, String> params) throws
AnalysisException {
+ if (params.size() != 2) {
+ throw new AnalysisException("Query TableValueFunction must have 2
arguments: 'catalog' and 'query'");
+ }
+ if (!params.containsKey(CATALOG) || !params.containsKey(QUERY)) {
+ throw new AnalysisException("Query TableValueFunction must have 2
arguments: 'catalog' and 'query'");
+ }
+ String catalogName = params.get(CATALOG);
+ this.query = params.get(QUERY);
+ this.catalogIf =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
+ }
+
+ public static QueryTableValueFunction
createQueryTableValueFunction(Map<String, String> params)
+ throws AnalysisException {
+ String catalogName = params.get(CATALOG);
+
+ // check priv
+ UserIdentity userIdentity =
ConnectContext.get().getCurrentUserIdentity();
+ if (!Env.getCurrentEnv().getAuth().checkCtlPriv(userIdentity,
catalogName, PrivPredicate.SELECT)) {
+ throw new org.apache.doris.nereids.exceptions.AnalysisException(
+ "user " + userIdentity + " has no privilege to query in
catalog " + catalogName);
+ }
+
+ CatalogIf catalogIf =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
+ if (catalogIf == null) {
+ throw new AnalysisException("Catalog not found: " + catalogName);
+ }
+ if (catalogIf instanceof JdbcExternalCatalog) {
+ return new JdbcQueryTableValueFunction(params);
+ } else {
+ throw new AnalysisException(
+ "Catalog not supported query tvf: " + catalogName + ",
catalog type:" + catalogIf.getType());
+ }
+ }
+
+ @Override
+ public String getTableName() {
+ return "QueryTableValueFunction";
+ }
+
+ @Override
+ public abstract List<Column> getTableColumns() throws AnalysisException;
+
+ @Override
+ public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc);
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index 41ed6e14cb2..c99da94de0f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -72,6 +72,8 @@ public abstract class TableValuedFunctionIf {
return new TasksTableValuedFunction(params);
case GroupCommitTableValuedFunction.NAME:
return new GroupCommitTableValuedFunction(params);
+ case QueryTableValueFunction.NAME:
+ return
QueryTableValueFunction.createQueryTableValueFunction(params);
default:
throw new AnalysisException("Could not find table function " +
funcName);
}
diff --git
a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_tvf.out
b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_tvf.out
new file mode 100644
index 00000000000..27110566d7a
--- /dev/null
+++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_tvf.out
@@ -0,0 +1,44 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+bigint BIGINT Yes true \N NONE
+bigint_u LARGEINT Yes true \N NONE
+binary TEXT Yes true \N NONE
+bit TEXT Yes true \N NONE
+blob TEXT Yes true \N NONE
+boolean TINYINT Yes true \N NONE
+char CHAR(6) Yes true \N NONE
+date DATE Yes true \N NONE
+datetime DATETIME Yes true \N NONE
+decimal DECIMAL(12, 4) Yes true \N NONE
+decimal_u DECIMAL(19, 5) Yes true \N NONE
+double DOUBLE Yes true \N NONE
+double_u DOUBLE Yes true \N NONE
+enum CHAR(6) Yes true \N NONE
+float FLOAT Yes true \N NONE
+float_u FLOAT Yes true \N NONE
+int INT Yes true \N NONE
+int_u BIGINT Yes true \N NONE
+json TEXT Yes true \N NONE
+mediumint INT Yes true \N NONE
+mediumint_u INT Yes true \N NONE
+set CHAR(6) Yes true \N NONE
+smallint SMALLINT Yes true \N NONE
+smallint_u INT Yes true \N NONE
+text TEXT Yes true \N NONE
+time TEXT Yes true \N NONE
+timestamp DATETIME(4) Yes true \N NONE
+tinyint TINYINT Yes true \N NONE
+tinyint_u SMALLINT Yes true \N NONE
+varbinary TEXT Yes true \N NONE
+varchar VARCHAR(10) Yes true \N NONE
+year SMALLINT Yes true \N NONE
+
+-- !sql --
+\N 302 \N 502 602 4.14159 \N 6.14159 \N -124
-302 2013 -402 -502 -602 \N 2012-10-26T02:08:39.345700
2013-10-26T08:09:18 -5.14145 \N -7.1400 row2 \N
09:11:09.567 text2 0xE86F6C6C6F20576F726C67 \N \N 0x2F
\N 0x88656C6C9F Value3
+201 301 401 501 601 3.14159 4.1415926 5.14159 1
-123 -301 2012 -401 -501 -601 2012-10-30
2012-10-25T12:05:36.345700 2012-10-25T08:08:08 -4.14145
-5.1400000001 -6.1400 row1 line1 09:09:09.567 text1
0x48656C6C6F20576F726C64 {"age": 30, "city": "London", "name": "Alice"}
Option1,Option3 0x2A 0x48656C6C6F00000000000000 0x48656C6C6F Value2
+202 302 402 502 602 4.14159 5.1415926 6.14159 0
-124 -302 2013 -402 -502 -602 2012-11-01
2012-10-26T02:08:39.345700 2013-10-26T08:09:18 -5.14145
-6.1400000001 -7.1400 row2 line2 09:11:09.567 text2
0xE86F6C6C6F20576F726C67 {"age": 18, "city": "ChongQing", "name":
"Gaoxin"} Option1,Option2 0x2F 0x58676C6C6F00000000000000
0x88656C6C9F Value3
+203 303 403 503 603 7.14159 8.1415926 9.14159 0
\N -402 2017 -602 -902 -1102 2012-11-02 \N
2013-10-27T08:11:18 -5.14145 -6.1400000000001 -7.1400 row3
line3 09:11:09.567 text3 0xE86F6C6C6F20576F726C67 {"age": 24,
"city": "ChongQing", "name": "ChenQi"} Option2 0x2F
0x58676C6C6F00000000000000 \N Value1
+
+-- !sql --
+4
+
diff --git
a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_tvf.groovy
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_tvf.groovy
new file mode 100644
index 00000000000..dcf36554be1
--- /dev/null
+++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_tvf.groovy
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_jdbc_query_tvf") {
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-java-8.0.25.jar"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String user = "test_jdbc_user";
+ String pwd = '123456';
+ String catalog_name = "mysql_jdbc_catalog";
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+
+ sql """drop catalog if exists ${catalog_name} """
+
+ sql """create catalog if not exists ${catalog_name} properties(
+ "type"="jdbc",
+ "user"="root",
+ "password"="123456",
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver"
+ );"""
+
+ order_qt_sql """desc function query('catalog' = '${catalog_name}',
'query' = 'select * from doris_test.all_types') """
+ order_qt_sql """select * from query('catalog' = '${catalog_name}',
'query' = 'select * from doris_test.all_types') """
+ order_qt_sql """select * from query('catalog' = '${catalog_name}',
'query' = 'select count(*) as cnt from doris_test.all_types') """
+
+// sql """drop catalog if exists ${catalog_name} """
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]