This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ca93a4e5c [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc
ca93a4e5c is described below
commit ca93a4e5c23f044fcd7afd5d05a5529f063a84e6
Author: yehere <[email protected]>
AuthorDate: Thu Mar 30 20:37:55 2023 +0800
[KYUUBI #4325] Support replace preparedStatement for Trino-jdbc
### _Why are the changes needed?_
close #4325
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4417 from yehere/kyuubi-4325.
Closes #4325
7b2864b53 [yehere] [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc,update the dependency file, run './build/dependency.sh --replace'
749b1c15c [yehere] [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc,Code optimization
33ea9ba2b [yehere] [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc,Code optimization
568418a21 [yehere] [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc,Add test case for DEALLOCATE PREPARE
358a8e3b8 [yehere] [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc,Support DEALLOCATE PREPARE
7d4a32402 [yehere] [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc
91392add6 [yehere] [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc
63bf8c462 [yehere] [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc
f5b7fb786 [yehere] [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc
b0476a79d [yehere] [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc
1a8f147a0 [yehere] [KYUUBI #4325] Support replace preparedStatement for
Trino-jdbc
Authored-by: yehere <[email protected]>
Signed-off-by: ulyssesyou <[email protected]>
---
dev/dependencyList | 5 ++
.../it/trino/server/TrinoFrontendSuite.scala | 32 +++++++++---
.../apache/kyuubi/operation/JDBCTestHelper.scala | 33 +++++++++++-
.../kyuubi/jdbc/hive/KyuubiPreparedStatement.java | 54 +-------------------
.../java/org/apache/kyuubi/jdbc/hive/Utils.java | 57 +++++++++++++++++++++
kyuubi-server/pom.xml | 6 +++
.../apache/kyuubi/sql/KyuubiTrinoFeBaseLexer.g4 | 21 ++++++++
.../apache/kyuubi/sql/KyuubiTrinoFeBaseParser.g4 | 11 ++++
.../org/apache/kyuubi/server/trino/api/Query.scala | 58 +++++++++++++++++++++-
.../kyuubi/server/trino/api/TrinoContext.scala | 12 +++--
.../server/trino/api/v1/StatementResource.scala | 43 ++++++++++++++--
.../sql/parser/trino/KyuubiTrinoFeAstBuilder.scala | 19 ++++++-
.../sql/parser/trino/KyuubiTrinoFeParser.scala | 1 +
.../kyuubi/sql/plan/trino/TrinoFeOperations.scala | 15 ++++++
.../parser/trino/KyuubiTrinoFeParserSuite.scala | 20 +++++++-
.../server/trino/api/TrinoClientApiSuite.scala | 1 +
16 files changed, 317 insertions(+), 71 deletions(-)
diff --git a/dev/dependencyList b/dev/dependencyList
index 509e28405..ab7697d35 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -22,6 +22,10 @@ annotations/4.1.1.4//annotations-4.1.1.4.jar
antlr-runtime/3.5.3//antlr-runtime-3.5.3.jar
antlr4-runtime/4.9.3//antlr4-runtime-4.9.3.jar
aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
+arrow-format/11.0.0//arrow-format-11.0.0.jar
+arrow-memory-core/11.0.0//arrow-memory-core-11.0.0.jar
+arrow-memory-netty/11.0.0//arrow-memory-netty-11.0.0.jar
+arrow-vector/11.0.0//arrow-vector-11.0.0.jar
classgraph/4.8.138//classgraph-4.8.138.jar
commons-codec/1.15//commons-codec-1.15.jar
commons-collections/3.2.2//commons-collections-3.2.2.jar
@@ -35,6 +39,7 @@ derby/10.14.2.0//derby-10.14.2.0.jar
error_prone_annotations/2.14.0//error_prone_annotations-2.14.0.jar
failsafe/2.4.4//failsafe-2.4.4.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
+flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
fliptables/1.0.2//fliptables-1.0.2.jar
grpc-api/1.48.0//grpc-api-1.48.0.jar
grpc-context/1.48.0//grpc-context-1.48.0.jar
diff --git
a/integration-tests/kyuubi-trino-it/src/test/scala/org/apache/kyuubi/it/trino/server/TrinoFrontendSuite.scala
b/integration-tests/kyuubi-trino-it/src/test/scala/org/apache/kyuubi/it/trino/server/TrinoFrontendSuite.scala
index bd8bf3eda..4a175a28b 100644
---
a/integration-tests/kyuubi-trino-it/src/test/scala/org/apache/kyuubi/it/trino/server/TrinoFrontendSuite.scala
+++
b/integration-tests/kyuubi-trino-it/src/test/scala/org/apache/kyuubi/it/trino/server/TrinoFrontendSuite.scala
@@ -26,18 +26,37 @@ import org.apache.kyuubi.operation.SparkMetadataTests
/**
* This test is for Trino jdbc driver with Kyuubi Server and Spark engine:
*
- * -------------------------------------------------------------
- * | JDBC |
- * | Trino-driver ----> Kyuubi Server --> Spark Engine |
- * | |
- * -------------------------------------------------------------
+ * -------------------------------------------------------------
+ * | JDBC |
+ * | Trino-driver ----> Kyuubi Server --> Spark Engine |
+ * | |
+ * -------------------------------------------------------------
*/
class TrinoFrontendSuite extends WithKyuubiServer with SparkMetadataTests {
- // TODO: Add more test cases
+
+ test("execute statement - select 11 where 1=1") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT 11 where 1<1")
+ while (resultSet.next()) {
+ assert(resultSet.getInt(1) === 11)
+ }
+ }
+ }
+
+ test("execute preparedStatement - select 11 where 1 = 1") {
+ withJdbcPrepareStatement("select 11 where 1 = ? ") { statement =>
+ statement.setInt(1, 1)
+ val rs = statement.executeQuery()
+ while (rs.next()) {
+ assert(rs.getInt(1) == 11)
+ }
+ }
+ }
override protected val conf: KyuubiConf = {
KyuubiConf().set(KyuubiConf.FRONTEND_PROTOCOLS, Seq("TRINO"))
}
+
override protected def jdbcUrl: String = {
s"jdbc:trino://${server.frontendServices.head.connectionUrl}/;"
}
@@ -47,7 +66,6 @@ class TrinoFrontendSuite extends WithKyuubiServer with
SparkMetadataTests {
override def beforeAll(): Unit = {
super.beforeAll()
-
// eagerly start spark engine before running test, it's a workaround for
trino jdbc driver
// since it does not support changing http connect timeout
try {
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestHelper.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestHelper.scala
index 663fd1816..97330837d 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestHelper.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestHelper.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.operation
-import java.sql.{DriverManager, SQLException, Statement}
+import java.sql.{DriverManager, PreparedStatement, SQLException, Statement}
import java.util.Locale
import org.apache.kyuubi.KyuubiFunSuite
@@ -75,6 +75,31 @@ trait JDBCTestHelper extends KyuubiFunSuite {
}
}
+ def withMultipleConnectionJdbcPrepareStatement(
+ sql: String,
+ tableNames: String*)(fs: (PreparedStatement => Unit)*): Unit = {
+ val connections = fs.map { _ =>
DriverManager.getConnection(jdbcUrlWithConf, user, password) }
+ val statements = connections.map(_.prepareStatement(sql))
+
+ try {
+ statements.zip(fs).foreach { case (s, f) => f(s) }
+ } finally {
+ tableNames.foreach { name =>
+ if (name.toUpperCase(Locale.ROOT).startsWith("VIEW")) {
+ statements.head.execute(s"DROP VIEW IF EXISTS $name")
+ } else {
+ statements.head.execute(s"DROP TABLE IF EXISTS $name")
+ }
+ }
+ info("Closing statements")
+ statements.foreach(_.close())
+ info("Closed statements")
+ info("Closing connections")
+ connections.foreach(_.close())
+ info("Closed connections")
+ }
+ }
+
def withDatabases(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
val connections = fs.map { _ =>
DriverManager.getConnection(jdbcUrlWithConf, user, password) }
val statements = connections.map(_.createStatement())
@@ -97,4 +122,10 @@ trait JDBCTestHelper extends KyuubiFunSuite {
def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
withMultipleConnectionJdbcStatement(tableNames: _*)(f)
}
+
+ def withJdbcPrepareStatement(
+ sql: String,
+ tableNames: String*)(f: PreparedStatement => Unit): Unit = {
+ withMultipleConnectionJdbcPrepareStatement(sql, tableNames: _*)(f)
+ }
}
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiPreparedStatement.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiPreparedStatement.java
index 43c2a030b..a0d4f3bfd 100644
---
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiPreparedStatement.java
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiPreparedStatement.java
@@ -26,9 +26,7 @@ import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Scanner;
import org.apache.hive.service.rpc.thrift.TCLIService;
import org.apache.hive.service.rpc.thrift.TSessionHandle;
@@ -81,57 +79,7 @@ public class KyuubiPreparedStatement extends KyuubiStatement
implements SQLPrepa
/** update the SQL string with parameters set by setXXX methods of {@link
PreparedStatement} */
private String updateSql(final String sql, HashMap<Integer, String>
parameters)
throws SQLException {
- List<String> parts = splitSqlStatement(sql);
-
- StringBuilder newSql = new StringBuilder(parts.get(0));
- for (int i = 1; i < parts.size(); i++) {
- if (!parameters.containsKey(i)) {
- throw new KyuubiSQLException("Parameter #" + i + " is unset");
- }
- newSql.append(parameters.get(i));
- newSql.append(parts.get(i));
- }
- return newSql.toString();
- }
-
- /**
- * Splits the parametered sql statement at parameter boundaries.
- *
- * <p>taking into account ' and \ escaping.
- *
- * <p>output for: 'select 1 from ? where a = ?' ['select 1 from ',' where a
= ','']
- */
- private List<String> splitSqlStatement(String sql) {
- List<String> parts = new ArrayList<>();
- int apCount = 0;
- int off = 0;
- boolean skip = false;
-
- for (int i = 0; i < sql.length(); i++) {
- char c = sql.charAt(i);
- if (skip) {
- skip = false;
- continue;
- }
- switch (c) {
- case '\'':
- apCount++;
- break;
- case '\\':
- skip = true;
- break;
- case '?':
- if ((apCount & 1) == 0) {
- parts.add(sql.substring(off, i));
- off = i + 1;
- }
- break;
- default:
- break;
- }
- }
- parts.add(sql.substring(off, sql.length()));
- return parts;
+ return Utils.updateSql(sql, parameters);
}
@Override
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/Utils.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/Utils.java
index 1daa322ec..ac9b29664 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/Utils.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/Utils.java
@@ -22,6 +22,7 @@ import static
org.apache.kyuubi.jdbc.hive.JdbcConnectionParams.*;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.regex.Matcher;
@@ -89,6 +90,62 @@ public class Utils {
throw new KyuubiSQLException(status);
}
+ /**
+ * Splits the parametered sql statement at parameter boundaries.
+ *
+ * <p>taking into account ' and \ escaping.
+ *
+ * <p>output for: 'select 1 from ? where a = ?' ['select 1 from ',' where a
= ','']
+ */
+ static List<String> splitSqlStatement(String sql) {
+ List<String> parts = new ArrayList<>();
+ int apCount = 0;
+ int off = 0;
+ boolean skip = false;
+
+ for (int i = 0; i < sql.length(); i++) {
+ char c = sql.charAt(i);
+ if (skip) {
+ skip = false;
+ continue;
+ }
+ switch (c) {
+ case '\'':
+ apCount++;
+ break;
+ case '\\':
+ skip = true;
+ break;
+ case '?':
+ if ((apCount & 1) == 0) {
+ parts.add(sql.substring(off, i));
+ off = i + 1;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ parts.add(sql.substring(off, sql.length()));
+ return parts;
+ }
+
+ /** update the SQL string with parameters set by setXXX methods of {@link
PreparedStatement} */
+ public static String updateSql(final String sql, HashMap<Integer, String>
parameters)
+ throws SQLException {
+ List<String> parts = splitSqlStatement(sql);
+
+ StringBuilder newSql = new StringBuilder(parts.get(0));
+ for (int i = 1; i < parts.size(); i++) {
+ if (!parameters.containsKey(i)) {
+ throw new KyuubiSQLException("Parameter #" + i + " is unset");
+ }
+ newSql.append(parameters.get(i));
+ newSql.append(parts.get(i));
+ }
+ return newSql.toString();
+ }
+
public static JdbcConnectionParams parseURL(String uri)
throws JdbcUriParseException, SQLException, ZooKeeperHiveClientException
{
return parseURL(uri, new Properties());
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 83698fa5a..7408ac5dd 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -36,6 +36,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-hive-jdbc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-events_${scala.binary.version}</artifactId>
diff --git
a/kyuubi-server/src/main/antlr4/org/apache/kyuubi/sql/KyuubiTrinoFeBaseLexer.g4
b/kyuubi-server/src/main/antlr4/org/apache/kyuubi/sql/KyuubiTrinoFeBaseLexer.g4
index 0cc02de64..83810a073 100644
---
a/kyuubi-server/src/main/antlr4/org/apache/kyuubi/sql/KyuubiTrinoFeBaseLexer.g4
+++
b/kyuubi-server/src/main/antlr4/org/apache/kyuubi/sql/KyuubiTrinoFeBaseLexer.g4
@@ -43,6 +43,10 @@ FALSE: 'FALSE';
LIKE: 'LIKE';
IN: 'IN';
WHERE: 'WHERE';
+EXECUTE: 'EXECUTE';
+PREPARE: 'PREPARE';
+DEALLOCATE: 'DEALLOCATE';
+USING: 'USING';
ESCAPE: 'ESCAPE';
AUTO_INCREMENT: 'AUTO_INCREMENT';
@@ -98,7 +102,16 @@ SOURCE_DATA_TYPE: 'SOURCE_DATA_TYPE';
IS_AUTOINCREMENT: 'IS_AUTOINCREMENT';
IS_GENERATEDCOLUMN: 'IS_GENERATEDCOLUMN';
VARCHAR: 'VARCHAR';
+TINYINT: 'TINYINT';
SMALLINT: 'SMALLINT';
+INTEGER: 'INTEGER';
+BIGINT: 'BIGINT';
+REAL: 'REAL';
+DOUBLE: 'DOUBLE';
+DECIMAL: 'DECIMAL';
+DATE: 'DATE';
+TIME: 'TIME';
+TIMESTAMP: 'TIMESTAMP';
CAST: 'CAST';
AS: 'AS';
KEY_SEQ: 'KEY_SEQ';
@@ -114,6 +127,10 @@ STRING
: '\'' ( ~'\'' | '\'\'' )* '\''
;
+STRING_MARK
+ : '\''
+ ;
+
SIMPLE_COMMENT
: '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN)
;
@@ -125,6 +142,10 @@ BRACKETED_COMMENT
WS : [ \r\n\t]+ -> channel(HIDDEN)
;
+IDENTIFIER
+ :
[A-Za-z_$0-9\u0080-\uFFFF]*?[A-Za-z_$\u0080-\uFFFF]+?[A-Za-z_$0-9\u0080-\uFFFF]*
+ ;
+
// Catch-all for anything we can't recognize.
// We use this to be able to ignore and recover all the text
// when splitting statements with DelimiterLexer
diff --git
a/kyuubi-server/src/main/antlr4/org/apache/kyuubi/sql/KyuubiTrinoFeBaseParser.g4
b/kyuubi-server/src/main/antlr4/org/apache/kyuubi/sql/KyuubiTrinoFeBaseParser.g4
index 6af00af5d..72811e592 100644
---
a/kyuubi-server/src/main/antlr4/org/apache/kyuubi/sql/KyuubiTrinoFeBaseParser.g4
+++
b/kyuubi-server/src/main/antlr4/org/apache/kyuubi/sql/KyuubiTrinoFeBaseParser.g4
@@ -54,9 +54,20 @@ statement
CAST LEFT_PAREN NULL AS SMALLINT RIGHT_PAREN KEY_SEQ COMMA
CAST LEFT_PAREN NULL AS VARCHAR RIGHT_PAREN PK_NAME
WHERE FALSE
#getPrimaryKeys
+ | EXECUTE IDENTIFIER (USING parameterList)?
#execute
+ | PREPARE IDENTIFIER FROM statement
#prepare
+ | DEALLOCATE PREPARE IDENTIFIER
#deallocate
| .*?
#passThrough
;
+anyStr
+ : ( ~',' )*
+ ;
+
+parameterList
+ :
(TINYINT|SMALLINT|INTEGER|BIGINT|DOUBLE|REAL|DECIMAL|DATE|TIME|TIMESTAMP)?
anyStr (','
(TINYINT|SMALLINT|INTEGER|BIGINT|DOUBLE|REAL|DECIMAL|DATE|TIME|TIMESTAMP)?
anyStr)*
+ ;
+
tableCatalogFilter
: (TABLE_CAT | TABLE_CATALOG) IS NULL
#nullCatalog
| (TABLE_CAT | TABLE_CATALOG) EQ catalog=STRING+
#catalogFilter
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
index 925875579..4e768b04a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala
@@ -30,12 +30,13 @@ import scala.collection.mutable
import Slug.Context.{EXECUTING_QUERY, QUEUED_QUERY}
import com.google.common.hash.Hashing
import io.trino.client.QueryResults
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.hive.service.rpc.thrift.{TBoolValue, TColumnDesc,
TColumnValue, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TProtocolVersion,
TRow, TRowSet, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
-import org.apache.kyuubi.operation.{FetchOrientation, OperationHandle}
+import org.apache.kyuubi.operation.{FetchOrientation, OperationHandle,
OperationState, OperationStatus}
import org.apache.kyuubi.operation.OperationState.{FINISHED, INITIALIZED,
OperationState, PENDING}
import org.apache.kyuubi.server.trino.api.Query.KYUUBI_SESSION_ID
import org.apache.kyuubi.service.BackendService
+import org.apache.kyuubi.service.TFrontendService.OK_STATUS
import org.apache.kyuubi.session.SessionHandle
case class Query(
@@ -85,6 +86,45 @@ case class Query(
}
}
+ def getPrepareQueryResults(
+ token: Long,
+ uriInfo: UriInfo,
+ maxWait: Long = 0): QueryResults = {
+ val status = OperationStatus(OperationState.FINISHED, 0, 0, 0, 0, false)
+ val nextUri = null
+ val queryHtmlUri = uriInfo.getRequestUriBuilder
+ .replacePath("ui/query.html").replaceQuery(queryId.getQueryId).build()
+
+ val columns = new TGetResultSetMetadataResp()
+ columns.setStatus(OK_STATUS)
+ val tColumnDesc = new TColumnDesc()
+ tColumnDesc.setColumnName("result")
+ val desc = new TTypeDesc
+ desc.addToTypes(TTypeEntry.primitiveEntry(new
TPrimitiveTypeEntry(TTypeId.BOOLEAN_TYPE)))
+ tColumnDesc.setTypeDesc(desc)
+ tColumnDesc.setPosition(0)
+ val schema = new TTableSchema()
+ schema.addToColumns(tColumnDesc)
+ columns.setSchema(schema)
+
+ val rows = new java.util.ArrayList[TRow]
+ val trow = new TRow()
+ val value = new TBoolValue()
+ value.setValue(true)
+ trow.addToColVals(TColumnValue.boolVal(value))
+ rows.add(trow)
+ val rowSet = new TRowSet(0, rows)
+
+ TrinoContext.createQueryResults(
+ queryId.getQueryId,
+ nextUri,
+ queryHtmlUri,
+ status,
+ Option(columns),
+ Option(rowSet),
+ updateType = "PREPARE")
+ }
+
def getLastToken: Long = this.lastToken.get()
def getSlug: Slug = this.slug
@@ -152,6 +192,20 @@ object Query {
Query(QueryId(operationHandle), updatedContext, backendService)
}
+ def apply(
+ statementId: String,
+ statement: String,
+ context: TrinoContext,
+ backendService: BackendService): Query = {
+ val sessionHandle = getOrCreateSession(context, backendService)
+ val sessionWithId =
+ context.session + (KYUUBI_SESSION_ID ->
sessionHandle.identifier.toString)
+ Query(
+ queryId = QueryId(new OperationHandle(UUID.randomUUID())),
+ context.copy(preparedStatement = Map(statementId -> statement), session
= sessionWithId),
+ backendService)
+ }
+
def apply(id: String, context: TrinoContext, backendService:
BackendService): Query = {
Query(QueryId(id), context, backendService)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala
index 8c85f31d7..16fc0388a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/TrinoContext.scala
@@ -187,14 +187,20 @@ object TrinoContext {
queryHtmlUri: URI,
queryStatus: OperationStatus,
columns: Option[TGetResultSetMetadataResp] = None,
- data: Option[TRowSet] = None): QueryResults = {
+ data: Option[TRowSet] = None,
+ updateType: String = null): QueryResults = {
val columnList = columns match {
case Some(value) => convertTColumn(value)
case None => null
}
val rowList = data match {
- case Some(value) => convertTRowSet(value)
+ case Some(value) =>
+ Option(updateType) match {
+ case Some("PREPARE") =>
+
ImmutableList.of(ImmutableList.of(true).asInstanceOf[util.List[Object]])
+ case _ => convertTRowSet(value)
+ }
case None => null
}
@@ -214,7 +220,7 @@ object TrinoContext {
.setElapsedTimeMillis(0).setQueuedTimeMillis(0).build(),
toQueryError(queryStatus),
defaultWarning,
- null,
+ updateType,
0L)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala
index e051dbb23..124b84688 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/v1/StatementResource.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.server.trino.api.v1
+import java.util
+import java.util.UUID
import javax.ws.rs._
import javax.ws.rs.core.{Context, HttpHeaders, MediaType, Response, UriInfo}
import javax.ws.rs.core.MediaType.TEXT_PLAIN_TYPE
@@ -32,16 +34,21 @@ import io.swagger.v3.oas.annotations.tags.Tag
import io.trino.client.QueryResults
import org.apache.kyuubi.Logging
+import org.apache.kyuubi.jdbc.hive.Utils
+import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.server.trino.api.{ApiRequestContext,
KyuubiTrinoOperationTranslator, Query, QueryId, Slug, TrinoContext}
import org.apache.kyuubi.server.trino.api.Slug.Context.{EXECUTING_QUERY,
QUEUED_QUERY}
import org.apache.kyuubi.server.trino.api.v1.dto.Ok
import org.apache.kyuubi.service.BackendService
+import org.apache.kyuubi.sql.parser.trino.KyuubiTrinoFeParser
+import org.apache.kyuubi.sql.plan.trino.{Deallocate, ExecuteForPreparing,
Prepare}
@Tag(name = "Statement")
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class StatementResource extends ApiRequestContext with Logging {
lazy val translator = new KyuubiTrinoOperationTranslator(fe.be)
+ lazy val parser = new KyuubiTrinoFeParser()
@ApiResponse(
responseCode = "200",
@@ -73,9 +80,39 @@ private[v1] class StatementResource extends
ApiRequestContext with Logging {
val trinoContext = TrinoContext(headers, remoteAddr)
try {
- val query = Query(statement, trinoContext, translator, fe.be)
- val qr = query.getQueryResults(query.getLastToken, uriInfo)
- TrinoContext.buildTrinoResponse(qr, query.context)
+ parser.parsePlan(statement) match {
+ case Prepare(statementId, _) =>
+ val query = Query(
+ statementId,
+ statement.split(s"$statementId FROM")(1),
+ trinoContext,
+ fe.be)
+ val qr = query.getPrepareQueryResults(query.getLastToken, uriInfo)
+ TrinoContext.buildTrinoResponse(qr, query.context)
+ case ExecuteForPreparing(statementId, parameters) =>
+ val parametersMap = new util.HashMap[Integer, String]()
+ for (i <- 0 until parameters.size) {
+ parametersMap.put(i + 1, parameters(i))
+ }
+ trinoContext.preparedStatement.get(statementId).map { originSql =>
+ val realSql = Utils.updateSql(originSql, parametersMap)
+ val query = Query(realSql, trinoContext, translator, fe.be)
+ val qr = query.getQueryResults(query.getLastToken, uriInfo)
+ TrinoContext.buildTrinoResponse(qr, query.context)
+ }.get
+ case Deallocate(statementId) =>
+ info(s"DEALLOCATE PREPARE ${statementId}")
+ val query = Query(
+ QueryId(new OperationHandle(UUID.randomUUID())),
+ trinoContext,
+ fe.be)
+ val qr = query.getPrepareQueryResults(query.getLastToken, uriInfo)
+ TrinoContext.buildTrinoResponse(qr, query.context)
+ case _ =>
+ val query = Query(statement, trinoContext, translator, fe.be)
+ val qr = query.getQueryResults(query.getLastToken, uriInfo)
+ TrinoContext.buildTrinoResponse(qr, query.context)
+ }
} catch {
case e: Exception =>
val errorMsg =
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/parser/trino/KyuubiTrinoFeAstBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/parser/trino/KyuubiTrinoFeAstBuilder.scala
index 061985c1c..8d1e38519 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/parser/trino/KyuubiTrinoFeAstBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/parser/trino/KyuubiTrinoFeAstBuilder.scala
@@ -25,7 +25,7 @@ import org.apache.kyuubi.sql.KyuubiTrinoFeBaseParser._
import org.apache.kyuubi.sql.KyuubiTrinoFeBaseParserBaseVisitor
import org.apache.kyuubi.sql.parser.KyuubiParser.unescapeSQLString
import org.apache.kyuubi.sql.plan.{KyuubiTreeNode, PassThroughNode}
-import org.apache.kyuubi.sql.plan.trino.{GetCatalogs, GetColumns,
GetPrimaryKeys, GetSchemas, GetTables, GetTableTypes, GetTypeInfo}
+import org.apache.kyuubi.sql.plan.trino.{Deallocate, ExecuteForPreparing,
GetCatalogs, GetColumns, GetPrimaryKeys, GetSchemas, GetTables, GetTableTypes,
GetTypeInfo, Prepare}
class KyuubiTrinoFeAstBuilder extends
KyuubiTrinoFeBaseParserBaseVisitor[AnyRef] {
@@ -123,4 +123,21 @@ class KyuubiTrinoFeAstBuilder extends
KyuubiTrinoFeBaseParserBaseVisitor[AnyRef]
override def visitTypesFilter(ctx: TypesFilterContext): List[String] = {
ctx.stringLit().asScala.map(v => unescapeSQLString(v.getText)).toList
}
+
+ override def visitExecute(ctx: ExecuteContext): KyuubiTreeNode = {
+ val parameters = Option(ctx.parameterList()) match {
+ case Some(para) =>
+ para.anyStr().asScala.toList.map(p => p.getText.substring(1,
p.getText.length - 1))
+ case None => List[String]()
+ }
+ ExecuteForPreparing(ctx.IDENTIFIER().getText, parameters)
+ }
+
+ override def visitPrepare(ctx: PrepareContext): KyuubiTreeNode = {
+ Prepare(ctx.IDENTIFIER().getText, ctx.statement().getText)
+ }
+
+ override def visitDeallocate(ctx: DeallocateContext): KyuubiTreeNode = {
+ Deallocate(ctx.IDENTIFIER().getText)
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/parser/trino/KyuubiTrinoFeParser.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/parser/trino/KyuubiTrinoFeParser.scala
index 987288b0f..5dececf20 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/parser/trino/KyuubiTrinoFeParser.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/parser/trino/KyuubiTrinoFeParser.scala
@@ -56,4 +56,5 @@ class KyuubiTrinoFeParser extends
KyuubiParserBase[KyuubiTrinoFeBaseParser] {
}
override def parseTree(parser: KyuubiTrinoFeBaseParser): ParseTree =
parser.singleStatement()
+
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/trino/TrinoFeOperations.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/trino/TrinoFeOperations.scala
index 6136995ab..8d02a74c6 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/trino/TrinoFeOperations.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/trino/TrinoFeOperations.scala
@@ -59,3 +59,18 @@ case class GetColumns(
case class GetPrimaryKeys() extends KyuubiTreeNode {
override def name(): String = "Get Primary Keys"
}
+
+case class ExecuteForPreparing(statementId: String, parameters: List[String])
+ extends KyuubiTreeNode {
+ override def name(): String = "Execute For Preparing"
+}
+
+case class Prepare(statementId: String, sql: String)
+ extends KyuubiTreeNode {
+ override def name(): String = "Prepare Sql"
+}
+
+case class Deallocate(statementId: String)
+ extends KyuubiTreeNode {
+ override def name(): String = "Deallocate Prepare"
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/parser/trino/KyuubiTrinoFeParserSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/parser/trino/KyuubiTrinoFeParserSuite.scala
index bbced0b61..205a6a7be 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/parser/trino/KyuubiTrinoFeParserSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/parser/trino/KyuubiTrinoFeParserSuite.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.parser.trino
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.sql.parser.trino.KyuubiTrinoFeParser
import org.apache.kyuubi.sql.plan.{KyuubiTreeNode, PassThroughNode}
-import org.apache.kyuubi.sql.plan.trino.{GetCatalogs, GetColumns,
GetPrimaryKeys, GetSchemas, GetTables, GetTableTypes, GetTypeInfo}
+import org.apache.kyuubi.sql.plan.trino.{Deallocate, ExecuteForPreparing,
GetCatalogs, GetColumns, GetPrimaryKeys, GetSchemas, GetTables, GetTableTypes,
GetTypeInfo}
class KyuubiTrinoFeParserSuite extends KyuubiFunSuite {
val parser = new KyuubiTrinoFeParser()
@@ -369,4 +369,22 @@ class KyuubiTrinoFeParserSuite extends KyuubiFunSuite {
assert(kyuubiTreeNode.isInstanceOf[GetPrimaryKeys])
}
+
+ test("Support PreparedStatement for Trino Fe (ExecuteForPreparing)") {
+ val kyuubiTreeNode = parse(
+ """
+ | EXECUTE statement1 USING INTEGER '1'
+ |""".stripMargin)
+
+ assert(kyuubiTreeNode.isInstanceOf[ExecuteForPreparing])
+ }
+
+ test("Support PreparedStatement for Trino Fe (Deallocate)") {
+ val kyuubiTreeNode = parse(
+ """
+ | DEALLOCATE PREPARE statement1
+ |""".stripMargin)
+
+ assert(kyuubiTreeNode.isInstanceOf[Deallocate])
+ }
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala
index 13e10a112..478bf9174 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoClientApiSuite.scala
@@ -114,6 +114,7 @@ class TrinoClientApiSuite extends KyuubiFunSuite with
TrinoRestFrontendTestHelpe
(false, List[List[Any]]())
}
}
+
Iterator.continually(getData(trino)).takeWhile(_._1).flatMap(_._2).toList
}