This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f333e6b [KYUUBI #2034] Hive Backend Engine - GetPrimaryKeys
f333e6b is described below
commit f333e6ba22556a02c893f0bbd68c7fe17469e09f
Author: KenjiFujima <[email protected]>
AuthorDate: Sun Mar 20 20:03:29 2022 +0800
[KYUUBI #2034] Hive Backend Engine - GetPrimaryKeys
### _Why are the changes needed?_
Hive Backend Engine - GetPrimaryKeys.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2177 from KenjiFujima/KYUUBI-2034.
Closes #2034
a5c99b6f [KenjiFujima] [KYUUBI #2034] Hive Backend Engine - GetPrimaryKeys
Authored-by: KenjiFujima <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../flink/operation/FlinkSQLOperationManager.scala | 9 +++++
.../engine/hive/operation/GetPrimaryKeys.scala | 38 ++++++++++++++++++++++
.../hive/operation/HiveOperationManager.scala | 9 +++++
.../engine/hive/operation/HiveOperationSuite.scala | 24 ++++++++++++++
.../spark/operation/SparkSQLOperationManager.scala | 9 +++++
.../trino/operation/TrinoOperationManager.scala | 8 +++++
.../apache/kyuubi/operation/OperationManager.scala | 5 +++
.../kyuubi/service/AbstractBackendService.scala | 10 ++++++
.../org/apache/kyuubi/service/BackendService.scala | 5 +++
.../apache/kyuubi/service/TFrontendService.scala | 15 +++++++--
.../apache/kyuubi/session/AbstractSession.scala | 9 +++++
.../scala/org/apache/kyuubi/session/Session.scala | 4 +++
.../kyuubi/operation/NoopOperationManager.scala | 10 ++++++
.../kyuubi/operation/SparkMetadataTests.scala | 11 +++++--
.../kyuubi/service/TFrontendServiceSuite.scala | 15 ++++++---
.../apache/kyuubi/metrics/MetricsConstants.scala | 1 +
.../kyuubi/client/KyuubiSyncThriftClient.scala | 14 ++++++++
.../apache/kyuubi/operation/GetPrimaryKeys.scala | 34 +++++++++++++++++++
.../kyuubi/operation/KyuubiOperationManager.scala | 9 +++++
.../kyuubi/server/BackendServiceMetric.scala | 10 ++++++
.../kyuubi/server/api/v1/SessionsResource.scala | 24 ++++++++++++++
.../org/apache/kyuubi/server/api/v1/dto.scala | 5 +++
.../server/api/v1/SessionsResourceSuite.scala | 8 +++++
23 files changed, 277 insertions(+), 9 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 1404db5..ff08b4d 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -22,6 +22,7 @@ import java.util.Locale
import scala.collection.JavaConverters._
+import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.OperationModes._
import org.apache.kyuubi.engine.flink.result.Constants
@@ -124,4 +125,12 @@ class FlinkSQLOperationManager extends
OperationManager("FlinkSQLOperationManage
val op = new GetFunctions(session, catalogName, schemaName, functionName)
addOperation(op)
}
+
+ override def newGetPrimaryKeysOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/GetPrimaryKeys.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/GetPrimaryKeys.scala
new file mode 100644
index 0000000..7b030e4
--- /dev/null
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/GetPrimaryKeys.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.hive.operation
+
+import org.apache.hive.service.cli.operation.Operation
+
+import org.apache.kyuubi.operation.OperationType
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String)
+ extends HiveOperation(OperationType.GET_FUNCTIONS, session) {
+
+ override val internalHiveOperation: Operation =
+ delegatedOperationManager.newGetPrimaryKeysOperation(
+ hive,
+ catalogName,
+ schemaName,
+ tableName)
+}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
index ef95dd5..fe68428 100644
---
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -93,6 +93,15 @@ class HiveOperationManager() extends
OperationManager("HiveOperationManager") {
addOperation(operation)
}
+ override def newGetPrimaryKeysOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): Operation = {
+ val operation = new GetPrimaryKeys(session, catalogName, schemaName,
tableName)
+ addOperation(operation)
+ }
+
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
diff --git
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
index 8d03ccd..6b17d8f 100644
---
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
+++
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
@@ -237,6 +237,30 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
}
}
+ test("get primary keys") {
+ withDatabases("test_schema") { statement =>
+ statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
+ statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table(a
string, " +
+ "PRIMARY KEY(a) disable novalidate)")
+
+ try {
+ val meta = statement.getConnection.getMetaData
+ val resultSet = meta.getPrimaryKeys(null, "test_schema", "test_table")
+ val resultSetBuffer = ArrayBuffer[(String, String, String, String)]()
+ while (resultSet.next()) {
+ resultSetBuffer += Tuple4(
+ resultSet.getString(TABLE_CAT),
+ resultSet.getString(TABLE_SCHEM),
+ resultSet.getString(TABLE_NAME),
+ resultSet.getString(COLUMN_NAME))
+ }
+ assert(resultSetBuffer.contains((null, "test_schema", "test_table",
"a")))
+ } finally {
+ statement.execute("DROP TABLE test_schema.test_table")
+ }
+ }
+ }
+
test("basic execute statements, create, insert query") {
withJdbcStatement("hive_engine_test") { statement =>
statement.execute("CREATE TABLE hive_engine_test(id int, value string)
stored as orc")
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index 3abf3f2..c0c7899 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
+import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.OperationModes._
import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
@@ -131,4 +132,12 @@ class SparkSQLOperationManager private (name: String)
extends OperationManager(n
val op = new GetFunctions(session, catalogName, schemaName, functionName)
addOperation(op)
}
+
+ override def newGetPrimaryKeysOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
index df4412e..9441024 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
@@ -96,4 +96,12 @@ class TrinoOperationManager extends
OperationManager("TrinoOperationManager") {
// TODO: Supports the GetFunctions operation when Trino supports the query
of the functions.
throw KyuubiSQLException.featureNotSupported()
}
+
+ override def newGetPrimaryKeysOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index b950aa7..e14941c 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -70,6 +70,11 @@ abstract class OperationManager(name: String) extends
AbstractService(name) {
catalogName: String,
schemaName: String,
functionName: String): Operation
+ def newGetPrimaryKeysOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): Operation
final def addOperation(operation: Operation): Operation = synchronized {
handleToOperation.put(operation.getHandle, operation)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
index 00eba7b..985ce8c 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
@@ -121,6 +121,16 @@ abstract class AbstractBackendService(name: String)
.getFunctions(catalogName, schemaName, functionName)
}
+ override def getPrimaryKeys(
+ sessionHandle: SessionHandle,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): OperationHandle = {
+ sessionManager
+ .getSession(sessionHandle)
+ .getPrimaryKeys(catalogName, schemaName, tableName)
+ }
+
override def getOperationStatus(operationHandle: OperationHandle):
OperationStatus = {
val operation =
sessionManager.operationManager.getOperation(operationHandle)
if (operation.shouldRunAsync) {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
index 84f2746..18ce237 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
@@ -76,6 +76,11 @@ trait BackendService {
catalogName: String,
schemaName: String,
functionName: String): OperationHandle
+ def getPrimaryKeys(
+ sessionHandle: SessionHandle,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): OperationHandle
def getOperationStatus(operationHandle: OperationHandle): OperationStatus
def cancelOperation(operationHandle: OperationHandle): Unit
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
index d7af68d..d2d1269 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
@@ -365,8 +365,19 @@ abstract class TFrontendService(name: String)
override def GetPrimaryKeys(req: TGetPrimaryKeysReq): TGetPrimaryKeysResp = {
debug(req.toString)
val resp = new TGetPrimaryKeysResp
- val errStatus = KyuubiSQLException.featureNotSupported().toTStatus
- resp.setStatus(errStatus)
+ try {
+ val sessionHandle = SessionHandle(req.getSessionHandle)
+ val catalog = req.getCatalogName
+ val schema = req.getSchemaName
+ val table = req.getTableName
+ val opHandle = be.getPrimaryKeys(sessionHandle, catalog, schema, table)
+ resp.setOperationHandle(opHandle.toTOperationHandle)
+ resp.setStatus(OK_STATUS)
+ } catch {
+ case e: Exception =>
+ error("Error getting primary keys: ", e)
+ resp.setStatus(KyuubiSQLException.toTStatus(e))
+ }
resp
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index d74dcc2..d11e1aa 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -173,6 +173,15 @@ abstract class AbstractSession(
runOperation(operation)
}
+ override def getPrimaryKeys(
+ catalogName: String,
+ schemaName: String,
+ tableName: String): OperationHandle = {
+ val operation = sessionManager.operationManager
+ .newGetPrimaryKeysOperation(this, catalogName, schemaName, tableName)
+ runOperation(operation)
+ }
+
override def cancelOperation(operationHandle: OperationHandle): Unit =
withAcquireRelease() {
sessionManager.operationManager.cancelOperation(operationHandle)
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
index 153d570..af84ad9 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
@@ -71,6 +71,10 @@ trait Session {
catalogName: String,
schemaName: String,
functionName: String): OperationHandle
+ def getPrimaryKeys(
+ catalogName: String,
+ schemaName: String,
+ tableName: String): OperationHandle
def cancelOperation(operationHandle: OperationHandle): Unit
def closeOperation(operationHandle: OperationHandle): Unit
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
index 5b199b9..4288456 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
@@ -91,6 +91,16 @@ class NoopOperationManager extends OperationManager("noop") {
addOperation(operation)
}
+ override def newGetPrimaryKeysOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): Operation = {
+ val operation =
+ new NoopOperation(OperationType.GET_FUNCTIONS, session, schemaName ==
invalid)
+ addOperation(operation)
+ }
+
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala
index f61df8c..66ed51b 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala
@@ -21,8 +21,7 @@ import java.sql.{DatabaseMetaData, ResultSet, SQLException,
SQLFeatureNotSupport
import scala.util.Random
-import org.apache.kyuubi.KYUUBI_VERSION
-import org.apache.kyuubi.Utils
+import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Utils}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
// For both `in-memory` and `hive` external catalog
@@ -457,7 +456,13 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {
assert(metaData.getDefaultTransactionIsolation ===
java.sql.Connection.TRANSACTION_NONE)
assert(!metaData.supportsTransactions)
assert(!metaData.getProcedureColumns("", "%", "%", "%").next())
- intercept[SQLException](metaData.getPrimaryKeys("", "default", ""))
+ try {
+ assert(!metaData.getPrimaryKeys("", "default", "src").next())
+ } catch {
+ case e: Exception =>
+ assert(e.isInstanceOf[SQLException])
+
assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
+ }
assert(!metaData.getImportedKeys("", "default", "").next())
intercept[SQLException] {
metaData.getCrossReference("", "default", "src", "", "default", "src2")
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
index 4bf5dec..cfb99e1 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
@@ -324,10 +324,17 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
withSessionHandle { (client, handle) =>
val req = new TGetPrimaryKeysReq(handle)
val resp = client.GetPrimaryKeys(req)
- assert(resp.getOperationHandle === null)
- assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
- assert(resp.getStatus.getSqlState === "0A000")
- assert(resp.getStatus.getErrorMessage startsWith "feature not supported")
+ val opHandle = resp.getOperationHandle
+ assert(opHandle.getOperationType === TOperationType.GET_FUNCTIONS)
+ assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
+ checkOperationResult(client, opHandle)
+
+ req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle)
+ val resp1 = client.GetPrimaryKeys(req)
+ assert(resp1.getOperationHandle === null)
+ assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
+ assert(resp1.getStatus.getSqlState === null)
+ assert(resp1.getStatus.getErrorMessage startsWith "Invalid
SessionHandle")
}
}
diff --git
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
index f5f80e2..b722549 100644
---
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
+++
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
@@ -61,6 +61,7 @@ object MetricsConstants {
final val BS_GET_TABLE_TYPES = BACKEND_SERVICE + "get_table_types"
final val BS_GET_COLUMNS = BACKEND_SERVICE + "get_columns"
final val BS_GET_FUNCTIONS = BACKEND_SERVICE + "get_functions"
+ final val BS_GET_PRIMARY_KEY = BACKEND_SERVICE + "get_primary_keys"
final val BS_GET_OPERATION_STATUS = BACKEND_SERVICE + "get_operation_status"
final val BS_CANCEL_OPERATION = BACKEND_SERVICE + "cancel_operation"
final val BS_CLOSE_OPERATION = BACKEND_SERVICE + "close_operation"
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 0187ccb..98843f6 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -183,6 +183,20 @@ class KyuubiSyncThriftClient private (protocol: TProtocol)
resp.getOperationHandle
}
+ def getPrimaryKeys(
+ catalogName: String,
+ schemaName: String,
+ tableName: String): TOperationHandle = {
+ val req = new TGetPrimaryKeysReq()
+ req.setSessionHandle(_remoteSessionHandle)
+ req.setCatalogName(catalogName)
+ req.setSchemaName(schemaName)
+ req.setTableName(tableName)
+ val resp = withLockAcquired(GetPrimaryKeys(req))
+ ThriftUtils.verifyTStatus(resp.getStatus)
+ resp.getOperationHandle
+ }
+
def getOperationStatus(operationHandle: TOperationHandle):
TGetOperationStatusResp = {
val req = new TGetOperationStatusReq(operationHandle)
val resp = withLockAcquired(GetOperationStatus(req))
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetPrimaryKeys.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetPrimaryKeys.scala
new file mode 100644
index 0000000..1f2838b
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetPrimaryKeys.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String)
+ extends KyuubiOperation(OperationType.GET_FUNCTIONS, session) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ _remoteOpHandle = client.getPrimaryKeys(catalogName, schemaName,
tableName)
+ } catch onError()
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 9035405..c4dd960 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -114,6 +114,15 @@ class KyuubiOperationManager private (name: String)
extends OperationManager(nam
addOperation(operation)
}
+ override def newGetPrimaryKeysOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): Operation = {
+ val operation = new GetFunctions(session, catalogName, schemaName,
tableName)
+ addOperation(operation)
+ }
+
def newLaunchEngineOperation(session: KyuubiSessionImpl, shouldRunAsync:
Boolean): Operation = {
val operation = new LaunchEngine(session, shouldRunAsync)
addOperation(operation)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
index aa4f183..8a2bc77 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
@@ -122,6 +122,16 @@ trait BackendServiceMetric extends BackendService {
}
}
+ abstract override def getPrimaryKeys(
+ sessionHandle: SessionHandle,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): OperationHandle = {
+ MetricsSystem.timerTracing(MetricsConstants.BS_GET_PRIMARY_KEY) {
+ super.getPrimaryKeys(sessionHandle, catalogName, schemaName, tableName)
+ }
+ }
+
abstract override def getOperationStatus(operationHandle: OperationHandle):
OperationStatus = {
MetricsSystem.timerTracing(MetricsConstants.BS_GET_OPERATION_STATUS) {
super.getOperationStatus(operationHandle)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index 2e4bf05..00a5463 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -327,4 +327,28 @@ private[v1] class SessionsResource extends
ApiRequestContext with Logging {
throw new NotFoundException(errorMsg)
}
}
+
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON)),
+ description = "Create an operation with GET_FUNCTIONS type")
+ @POST
+ @Path("{sessionHandle}/operations/primaryKeys")
+ def getPrimaryKeys(
+ @PathParam("sessionHandle") sessionHandleStr: String,
+ request: GetPrimaryKeysRequest): OperationHandle = {
+ try {
+ fe.be.getPrimaryKeys(
+ parseSessionHandle(sessionHandleStr),
+ request.catalogName,
+ request.schemaName,
+ request.tableName)
+ } catch {
+ case NonFatal(e) =>
+ val errorMsg = "Error getting primary keys"
+ error(errorMsg, e)
+ throw new NotFoundException(errorMsg)
+ }
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index 5c38b48..eb4a5bf 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -71,6 +71,11 @@ case class GetFunctionsRequest(
schemaName: String,
functionName: String)
+case class GetPrimaryKeysRequest(
+ catalogName: String,
+ schemaName: String,
+ tableName: String)
+
case class OpActionRequest(action: String)
case class ResultSetMetaData(columns: Seq[ColumnDesc])
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
index f46027b..d6fd60c 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
@@ -246,5 +246,13 @@ class SessionsResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(200 == response.getStatus)
operationHandle = response.readEntity(classOf[OperationHandle])
assert(operationHandle.typ == OperationType.GET_FUNCTIONS)
+
+ val getPrimaryKeysReq = GetPrimaryKeysRequest("spark_catalog", "default",
"default")
+ response = webTarget.path(s"$pathPrefix/operations/primaryKeys")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(getPrimaryKeysReq, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle.typ == OperationType.GET_FUNCTIONS)
}
}