This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 2b50aaa [KYUUBI #2011] Introduce to very basic hive engine
2b50aaa is described below
commit 2b50aaa4ad4ad3fda95a6f6029b5b3fc3ba387f9
Author: Kent Yao <[email protected]>
AuthorDate: Tue Mar 8 20:58:42 2022 +0800
[KYUUBI #2011] Introduce to very basic hive engine
### _Why are the changes needed?_
Support a lightweight hive engine which has some advantages over hiveserver2
- separate engine/process to the multi-tenancy framework for better
stability, without having a heavy server
- more engines introduced in kyuubi, hive may be aged but still widely used
- also good for us to do some cross verifications for spark/flink etc with
a stable hive.
In this PR, a basic hive engine backend is introduced.
- only engine side
- only execute statement is supported
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2013 from yaooqinn/hiveengine.
Closes #2011
75d146bc [Kent Yao] Merge branch 'master' into hiveengine
a7152598 [Kent Yao] addressed
b6c8ef59 [Kent Yao] llap/shims -> jdk:tools
07a9843a [Kent Yao] hbase client -> jdk:tools
3acb4aa7 [Kent Yao] Merge branch 'master' into hiveengine
ef5bcb8a [Kent Yao] codecov
b09e46a3 [Kent Yao] hbase client -> jdk:tools
56f74d50 [Kent Yao] hive
dcacffea [Kent Yao] hive
c46ffc41 [Kent Yao] hive
2f883036 [Kent Yao] hive
186dd33d [Kent Yao] hive
5038505f [Kent Yao] hive
5bd2a849 [Kent Yao] hive
180d892d [Kent Yao] hive
568a52b9 [Kent Yao] init hive engine
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../engine/flink/session/FlinkSessionImpl.scala | 2 +-
externals/kyuubi-hive-sql-engine/pom.xml | 122 ++++++++++++++++++++
.../kyuubi/engine/hive/HiveBackendService.scala | 19 ++--
.../apache/kyuubi/engine/hive/HiveSQLEngine.scala | 91 +++++++++++++++
.../kyuubi/engine/hive/HiveTBinaryFrontend.scala | 25 +++--
.../engine/hive/operation/ExecuteStatement.scala | 34 ++++--
.../engine/hive/operation/HiveOperation.scala | 102 +++++++++++++++++
.../hive/operation/HiveOperationManager.scala | 124 +++++++++++++++++++++
.../engine/hive/session/HiveSessionImpl.scala} | 40 +++----
.../engine/hive/session/HiveSessionManager.scala | 117 +++++++++++++++++++
.../src/test/resources/log4j2-test.properties | 40 +++++++
.../engine/hive/operation/HiveOperationSuite.scala | 50 +++++++++
.../engine/spark/session/SparkSessionImpl.scala | 4 +-
.../engine/trino/session/TrinoSessionImpl.scala | 3 -
.../kyuubi/operation/AbstractOperation.scala | 2 +-
.../apache/kyuubi/session/AbstractSession.scala | 1 +
.../apache/kyuubi/session/NoopSessionImpl.scala | 1 -
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 1 -
pom.xml | 1 +
19 files changed, 714 insertions(+), 65 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index 9972642..9ab21ff 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -30,7 +30,7 @@ class FlinkSessionImpl(
ipAddress: String,
conf: Map[String, String],
sessionManager: SessionManager,
- val handle: SessionHandle,
+ override val handle: SessionHandle,
val sessionContext: SessionContext)
extends AbstractSession(protocol, user, password, ipAddress, conf,
sessionManager) {
diff --git a/externals/kyuubi-hive-sql-engine/pom.xml
b/externals/kyuubi-hive-sql-engine/pom.xml
new file mode 100644
index 0000000..63cfa0a
--- /dev/null
+++ b/externals/kyuubi-hive-sql-engine/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>kyuubi-parent</artifactId>
+ <groupId>org.apache.kyuubi</groupId>
+ <version>1.6.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kyuubi-hive-sql-engine_2.12</artifactId>
+ <name>Kyuubi Project Engine Hive SQL</name>
+ <packaging>jar</packaging>
+ <url>https://kyuubi.apache.org/</url>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service</artifactId>
+ <version>${hive.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.pentaho</groupId>
+ <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-tez</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-shims</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${hive.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-shims</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-hive-jdbc-shaded</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+
+</project>
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveBackendService.scala
similarity index 63%
copy from
kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
copy to
externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveBackendService.scala
index 6f76850..d850963 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveBackendService.scala
@@ -15,18 +15,13 @@
* limitations under the License.
*/
-package org.apache.kyuubi.session
+package org.apache.kyuubi.engine.hive
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.kyuubi.engine.hive.session.HiveSessionManager
+import org.apache.kyuubi.service.AbstractBackendService
+import org.apache.kyuubi.session.SessionManager
-class NoopSessionImpl(
- protocol: TProtocolVersion,
- user: String,
- password: String,
- ipAddress: String,
- conf: Map[String, String],
- sessionManager: SessionManager)
- extends AbstractSession(protocol, user, password, ipAddress, conf,
sessionManager) {
- override lazy val handle: SessionHandle = SessionHandle(protocol)
- override def open(): Unit = {}
+class HiveBackendService(engine: HiveSQLEngine)
+ extends AbstractBackendService("HiveBackendService") {
+ override val sessionManager: SessionManager = new HiveSessionManager(engine)
}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
new file mode 100644
index 0000000..237cc8b
--- /dev/null
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.hive
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+
+import org.apache.kyuubi.{Logging, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
+import org.apache.kyuubi.ha.client.RetryPolicies
+import org.apache.kyuubi.service.{AbstractBackendService,
AbstractFrontendService, Serverable}
+import org.apache.kyuubi.util.SignalRegister
+
+class HiveSQLEngine extends Serverable("HiveSQLEngine") {
+ override val backendService: AbstractBackendService = new
HiveBackendService(this)
+ override val frontendServices: Seq[AbstractFrontendService] =
+ Seq(new HiveTBinaryFrontend(this))
+
+ override def start(): Unit = {
+ super.start()
+ // Start engine self-terminating checker after all services are ready and
it can be reached by
+ // all servers in engine spaces.
+ backendService.sessionManager.startTerminatingChecker(() => stop())
+ }
+
+ override protected def stopServer(): Unit = {}
+}
+
+object HiveSQLEngine extends Logging {
+ var currentEngine: Option[HiveSQLEngine] = None
+ val hiveConf = new HiveConf()
+ val kyuubiConf = new KyuubiConf()
+
+ def startEngine(): HiveSQLEngine = {
+ kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY,
RetryPolicies.N_TIME.toString)
+
+ for ((k, v) <- kyuubiConf.getAll) {
+ hiveConf.set(k, v)
+ }
+
+ val isEmbeddedMetaStore = {
+ val msUri = hiveConf.getVar(ConfVars.METASTOREURIS)
+ val msConnUrl = hiveConf.getVar(ConfVars.METASTORECONNECTURLKEY)
+ (msUri == null || msUri.trim().isEmpty) &&
+ (msConnUrl != null && msConnUrl.startsWith("jdbc:derby"))
+ }
+ if (isEmbeddedMetaStore) {
+ hiveConf.setBoolean("hive.metastore.schema.verification", false)
+ hiveConf.setBoolean("datanucleus.schema.autoCreateAll", true)
+ hiveConf.set(
+ "hive.metastore.warehouse.dir",
+ Utils.createTempDir(namePrefix = "kyuubi_hive_warehouse").toString)
+ }
+
+ val engine = new HiveSQLEngine()
+ info(s"Starting ${engine.getName}")
+ engine.initialize(kyuubiConf)
+ engine.start()
+ Utils.addShutdownHook(() => engine.stop())
+ currentEngine = Some(engine)
+ engine
+ }
+
+ def main(args: Array[String]): Unit = {
+ SignalRegister.registerLogger(logger)
+ try {
+ startEngine()
+ } catch {
+ case t: Throwable =>
+ error(t)
+ currentEngine.foreach(_.stop())
+ }
+ }
+}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveTBinaryFrontend.scala
similarity index 61%
copy from
kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
copy to
externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveTBinaryFrontend.scala
index 6f76850..46b1f45 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveTBinaryFrontend.scala
@@ -15,18 +15,19 @@
* limitations under the License.
*/
-package org.apache.kyuubi.session
+package org.apache.kyuubi.engine.hive
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
+import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
-class NoopSessionImpl(
- protocol: TProtocolVersion,
- user: String,
- password: String,
- ipAddress: String,
- conf: Map[String, String],
- sessionManager: SessionManager)
- extends AbstractSession(protocol, user, password, ipAddress, conf,
sessionManager) {
- override lazy val handle: SessionHandle = SessionHandle(protocol)
- override def open(): Unit = {}
+class HiveTBinaryFrontend(override val serverable: Serverable)
+ extends TBinaryFrontendService("HiveTBinaryFrontend") {
+
+ override lazy val discoveryService: Option[Service] = {
+ if (ServiceDiscovery.supportServiceDiscovery(conf)) {
+ Some(new EngineServiceDiscovery(this))
+ } else {
+ None
+ }
+ }
}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/ExecuteStatement.scala
similarity index 53%
copy from
kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
copy to
externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/ExecuteStatement.scala
index 6f76850..8b32bf8 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/ExecuteStatement.scala
@@ -15,18 +15,28 @@
* limitations under the License.
*/
-package org.apache.kyuubi.session
+package org.apache.kyuubi.engine.hive.operation
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import scala.collection.JavaConverters._
-class NoopSessionImpl(
- protocol: TProtocolVersion,
- user: String,
- password: String,
- ipAddress: String,
- conf: Map[String, String],
- sessionManager: SessionManager)
- extends AbstractSession(protocol, user, password, ipAddress, conf,
sessionManager) {
- override lazy val handle: SessionHandle = SessionHandle(protocol)
- override def open(): Unit = {}
+import org.apache.hive.service.cli.operation.Operation
+
+import org.apache.kyuubi.operation.OperationType
+import org.apache.kyuubi.session.Session
+
+class ExecuteStatement(
+ session: Session,
+ override val statement: String,
+ confOverlay: Map[String, String],
+ override val shouldRunAsync: Boolean,
+ queryTimeout: Long)
+ extends HiveOperation(OperationType.EXECUTE_STATEMENT, session) {
+ override val internalHiveOperation: Operation = {
+ delegatedOperationManager.newExecuteStatementOperation(
+ hive,
+ statement,
+ confOverlay.asJava,
+ shouldRunAsync,
+ queryTimeout)
+ }
}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
new file mode 100644
index 0000000..b2477b0
--- /dev/null
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.hive.operation
+
+import java.util.concurrent.Future
+
+import org.apache.hive.service.cli.{OperationState => HiveOperationState}
+import org.apache.hive.service.cli.operation.Operation
+import org.apache.hive.service.cli.operation.OperationManager
+import org.apache.hive.service.cli.session.{HiveSession, SessionManager =>
HiveSessionManager}
+import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.engine.hive.session.HiveSessionImpl
+import org.apache.kyuubi.operation.{AbstractOperation, FetchOrientation,
OperationState, OperationStatus}
+import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.operation.OperationType.OperationType
+import org.apache.kyuubi.session.Session
+
+abstract class HiveOperation(opType: OperationType, session: Session)
+ extends AbstractOperation(opType, session) {
+
+ protected val hive: HiveSession = session.asInstanceOf[HiveSessionImpl].hive
+
+ protected def delegatedSessionManager: HiveSessionManager =
hive.getSessionManager
+
+ protected def delegatedOperationManager: OperationManager = {
+ delegatedSessionManager.getOperationManager
+ }
+
+ val internalHiveOperation: Operation
+
+ override def beforeRun(): Unit = {
+ setState(OperationState.RUNNING)
+ }
+
+ override def afterRun(): Unit = {
+ state.synchronized {
+ if (!isTerminalState(state)) {
+ setState(OperationState.FINISHED)
+ }
+ }
+ }
+ override def runInternal(): Unit = {
+ internalHiveOperation.run()
+ }
+
+ override def getBackgroundHandle: Future[_] = {
+ internalHiveOperation.getBackgroundHandle
+ }
+
+ override def cancel(): Unit = {
+ internalHiveOperation.cancel(HiveOperationState.CANCELED)
+ }
+
+ override def close(): Unit = {
+ internalHiveOperation.close()
+ }
+
+ override def getStatus: OperationStatus = {
+ super.getStatus
+ val status = internalHiveOperation.getStatus
+ val state =
OperationState.withName(status.getState.name().stripSuffix("_STATE"))
+
+ OperationStatus(
+ state,
+ createTime,
+ status.getOperationStarted,
+ lastAccessTime,
+ status.getOperationCompleted,
+ status.getHasResultSet,
+ Option(status.getOperationException).map(KyuubiSQLException(_)))
+ }
+
+ override def getResultSetSchema: TTableSchema = {
+ internalHiveOperation.getResultSetSchema.toTTableSchema
+ }
+
+ override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ val tOrder = FetchOrientation.toTFetchOrientation(order)
+ val hiveOrder =
org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
+ val rowSet = internalHiveOperation.getNextRowSet(hiveOrder, rowSetSize)
+ rowSet.toTRowSet
+ }
+
+ override def isTimedOut: Boolean =
internalHiveOperation.isTimedOut(System.currentTimeMillis)
+}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
new file mode 100644
index 0000000..b6798cd
--- /dev/null
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.hive.operation
+
+import java.sql.SQLException
+import java.util.List
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
+import org.apache.hive.service.cli.{RowSetFactory, TableSchema}
+import org.apache.hive.service.rpc.thrift.TRowSet
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.operation.{Operation, OperationHandle,
OperationManager}
+import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.session.Session
+
+class HiveOperationManager() extends OperationManager("HiveOperationManager") {
+ override def newExecuteStatementOperation(
+ session: Session,
+ statement: String,
+ confOverlay: Map[String, String],
+ runAsync: Boolean,
+ queryTimeout: Long): Operation = {
+ val operation = new ExecuteStatement(session, statement, confOverlay,
runAsync, queryTimeout)
+ addOperation(operation)
+ }
+
+ override def newGetTypeInfoOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetCatalogsOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetSchemasOperation(
+ session: Session,
+ catalog: String,
+ schema: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetTablesOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ tableTypes: List[String]): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetTableTypesOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetColumnsOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ columnName: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetFunctionsOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ functionName: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def getOperationLogRowSet(
+ opHandle: OperationHandle,
+ order: FetchOrientation,
+ maxRows: Int): TRowSet = {
+ def getLogSchema: TableSchema = {
+ val schema = new Schema
+ val fieldSchema = new FieldSchema
+ fieldSchema.setName("operation_log")
+ fieldSchema.setType("string")
+ schema.addToFieldSchemas(fieldSchema)
+ new TableSchema(schema)
+ }
+
+ val operation = getOperation(opHandle).asInstanceOf[HiveOperation]
+ val internalHiveOperation = operation.internalHiveOperation
+
+ val rowSet = RowSetFactory.create(getLogSchema,
operation.getProtocolVersion, false)
+ val operationLog = internalHiveOperation.getOperationLog
+ if (operationLog == null) {
+ throw KyuubiSQLException("Couldn't find log associated with operation
handle: " + opHandle)
+ }
+
+ try {
+ val logs = operationLog.readOperationLog(false, maxRows)
+ for (log <- logs.asScala) {
+ rowSet.addRow(Array(log))
+ }
+ } catch {
+ case e: SQLException =>
+ throw new KyuubiSQLException(e.getMessage, e.getCause)
+ }
+
+ rowSet.toTRowSet
+ }
+}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
similarity index 64%
copy from
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
copy to
externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
index 9972642..c797430 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
@@ -15,41 +15,43 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.flink.session
+package org.apache.kyuubi.engine.hive.session
-import org.apache.flink.table.client.gateway.{Executor, SqlExecutionException}
-import org.apache.flink.table.client.gateway.context.SessionContext
+import java.util.HashMap
+
+import scala.collection.JavaConverters._
+
+import org.apache.hive.service.cli.HiveSQLException
+import org.apache.hive.service.cli.session.HiveSession
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.session.{AbstractSession, SessionHandle,
SessionManager}
-class FlinkSessionImpl(
+class HiveSessionImpl(
protocol: TProtocolVersion,
user: String,
password: String,
+ serverIpAddress: String,
ipAddress: String,
conf: Map[String, String],
sessionManager: SessionManager,
- val handle: SessionHandle,
- val sessionContext: SessionContext)
+ override val handle: SessionHandle,
+ val hive: HiveSession)
extends AbstractSession(protocol, user, password, ipAddress, conf,
sessionManager) {
- def executor: Executor =
sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
-
- def sessionId: String = handle.identifier.toString
+ override def open(): Unit = {
+ val confClone = new HashMap[String, String]()
+ confClone.putAll(conf.asJava) // pass conf.asScala not support `put` method
+ hive.open(confClone)
+ }
- private def setModifiableConfig(key: String, value: String): Unit = {
+ override def close(): Unit = {
+ super.close()
try {
- sessionContext.set(key, value)
+ hive.close()
} catch {
- case e: SqlExecutionException => warn(e.getMessage)
- }
- }
-
- override def open(): Unit = {
- normalizedConf.foreach {
- case (key, value) => setModifiableConfig(key, value)
+ case e: HiveSQLException =>
+ error(s"Failed to close hive runtime session: ${e.getMessage}")
}
- super.open()
}
}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
new file mode 100644
index 0000000..048158e
--- /dev/null
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.hive.session
+
+import java.io.File
+import java.util.concurrent.Future
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hive.service.cli.{SessionHandle => ImportedSessionHandle}
+import org.apache.hive.service.cli.session.{HiveSessionImpl =>
ImportedHiveSessionImpl, SessionManager => ImportedHiveSessionManager}
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.hive.HiveSQLEngine
+import org.apache.kyuubi.engine.hive.operation.HiveOperationManager
+import org.apache.kyuubi.operation.OperationManager
+import org.apache.kyuubi.session.{CLIENT_IP_KEY, SessionHandle, SessionManager}
+
+class HiveSessionManager(engine: HiveSQLEngine) extends
SessionManager("HiveSessionManager") {
+ override protected def isServer: Boolean = false
+
+ override val operationManager: OperationManager = new HiveOperationManager()
+
+ private val internalSessionManager = new ImportedHiveSessionManager(null) {
+
+ /**
+ * Avoid unnecessary hive initialization
+ */
+ override def init(hiveConf: HiveConf): Unit = {
+ this.hiveConf = hiveConf
+ }
+
+ /**
+ * Avoid unnecessary hive initialization
+ */
+ override def start(): Unit = {}
+
+ /**
+ * Avoid unnecessary hive initialization
+ */
+ override def stop(): Unit = {}
+
+ /**
+ * Submit background operations with exec pool of Kyuubi impl
+ */
+ override def submitBackgroundOperation(r: Runnable): Future[_] = {
+ HiveSessionManager.this.submitBackgroundOperation(r)
+ }
+ }
+
+ override def openSession(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String]): SessionHandle = {
+ val sessionHandle = SessionHandle(protocol)
+ val clientIp = conf.getOrElse(CLIENT_IP_KEY, ipAddress)
+ info(s"Opening session for $user@$clientIp")
+ val hive = new ImportedHiveSessionImpl(
+ new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
+ protocol,
+ user,
+ password,
+ HiveSQLEngine.hiveConf,
+ ipAddress)
+ hive.setSessionManager(internalSessionManager)
+ hive.setOperationManager(internalSessionManager.getOperationManager)
+ operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new
File(dir)))
+ val session = new HiveSessionImpl(
+ protocol,
+ user,
+ password,
+ ipAddress,
+ clientIp,
+ conf,
+ this,
+ sessionHandle,
+ hive)
+ try {
+ session.open()
+ setSession(sessionHandle, session)
+ info(s"$user's session with $sessionHandle is opened, current opening
sessions" +
+ s" $getOpenSessionCount")
+ sessionHandle
+ } catch {
+ case e: Exception =>
+ session.close()
+ throw KyuubiSQLException(e)
+ }
+ }
+
+ override def closeSession(sessionHandle: SessionHandle): Unit = {
+ super.closeSession(sessionHandle)
+ if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString) {
+ info("Session stopped due to shared level is Connection.")
+ engine.stop()
+ }
+ }
+}
diff --git
a/externals/kyuubi-hive-sql-engine/src/test/resources/log4j2-test.properties
b/externals/kyuubi-hive-sql-engine/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..0c18971
--- /dev/null
+++ b/externals/kyuubi-hive-sql-engine/src/test/resources/log4j2-test.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+rootLogger.level = debug
+rootLogger.appenderRef.stdout.ref = STDOUT
+rootLogger.appenderRef.file.ref = File
+
+# Console Appender
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.target = SYSTEM_OUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss.SSS} %p %c: %m%n
+appender.console.filter.a.type = ThresholdFilter
+appender.console.filter.a.level = fatal
+
+# File Appender
+appender.file.type = File
+appender.file.name = File
+appender.file.fileName = target/unit-tests.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
+# Set the logger level of File Appender to DEBUG
+appender.file.filter.a.type = ThresholdFilter
+appender.file.filter.a.level = debug
diff --git
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
new file mode 100644
index 0000000..280d297
--- /dev/null
+++
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.hive.operation
+
+import org.apache.kyuubi.engine.hive.HiveSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class HiveOperationSuite extends HiveJDBCTestHelper {
+
+ override def beforeAll(): Unit = {
+ HiveSQLEngine.startEngine()
+ super.beforeAll()
+ }
+
+ override protected def jdbcUrl: String = {
+ "jdbc:hive2://" +
HiveSQLEngine.currentEngine.get.frontendServices.head.connectionUrl + "/;"
+ }
+
+ test("basic execute statements, create, insert query") {
+ withJdbcStatement("hive_engine_test") { statement =>
+ statement.execute("CREATE TABLE hive_engine_test(id int, value string)
stored as orc")
+ statement.execute("INSERT INTO hive_engine_test SELECT 1, '2'")
+
+ val resultSet = statement.executeQuery("SELECT ID, VALUE FROM
hive_engine_test")
+ assert(resultSet.next())
+ assert(resultSet.getInt("ID") === 1)
+ assert(resultSet.getString("VALUE") === "2")
+
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.INTEGER)
+ assert(metaData.getPrecision(1) === 10)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 1dc59ee..fd896a7 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -25,7 +25,7 @@ import
org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.engine.spark.udf.KDFRegistry
import org.apache.kyuubi.events.EventLogging
import org.apache.kyuubi.operation.{Operation, OperationHandle}
-import org.apache.kyuubi.session.{AbstractSession, SessionHandle,
SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionManager}
class SparkSessionImpl(
protocol: TProtocolVersion,
@@ -37,7 +37,6 @@ class SparkSessionImpl(
sessionManager: SessionManager,
val spark: SparkSession)
extends AbstractSession(protocol, user, password, ipAddress, conf,
sessionManager) {
- override val handle: SessionHandle = SessionHandle(protocol)
private def setModifiableConfig(key: String, value: String): Unit = {
try {
@@ -73,5 +72,4 @@ class SparkSessionImpl(
spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable(_))
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle)
}
-
}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
index 1b68fc7..9325f43 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
@@ -35,7 +35,6 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.trino.TrinoConf
import org.apache.kyuubi.engine.trino.TrinoContext
import org.apache.kyuubi.session.AbstractSession
-import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.session.SessionManager
class TrinoSessionImpl(
@@ -50,8 +49,6 @@ class TrinoSessionImpl(
var trinoContext: TrinoContext = _
private var clientSession: ClientSession = _
- override val handle: SessionHandle = SessionHandle(protocol)
-
override def open(): Unit = {
normalizedConf.foreach {
case ("use:database", database) => clientSession =
createClientSession(database)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 9b6d73d..92b1188 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -33,7 +33,7 @@ import org.apache.kyuubi.session.Session
abstract class AbstractOperation(opType: OperationType, session: Session)
extends Operation with Logging {
- final private val createTime = System.currentTimeMillis()
+ final protected val createTime = System.currentTimeMillis()
final private val handle = OperationHandle(opType, session.protocol)
final private val operationTimeout: Long = {
session.sessionManager.getConf.get(OPERATION_IDLE_TIMEOUT)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 4af63dd..d74dcc2 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -34,6 +34,7 @@ abstract class AbstractSession(
val ipAddress: String,
val conf: Map[String, String],
val sessionManager: SessionManager) extends Session with Logging {
+ override val handle: SessionHandle = SessionHandle(protocol)
protected def logSessionInfo(msg: String): Unit = info(s"[$user:$ipAddress]
$handle - $msg")
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
index 6f76850..91548a9 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionImpl.scala
@@ -27,6 +27,5 @@ class NoopSessionImpl(
conf: Map[String, String],
sessionManager: SessionManager)
extends AbstractSession(protocol, user, password, ipAddress, conf,
sessionManager) {
- override lazy val handle: SessionHandle = SessionHandle(protocol)
override def open(): Unit = {}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 8d7b0ab..686c715 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -44,7 +44,6 @@ class KyuubiSessionImpl(
sessionManager: KyuubiSessionManager,
sessionConf: KyuubiConf)
extends AbstractSession(protocol, user, password, ipAddress, conf,
sessionManager) {
- override val handle: SessionHandle = SessionHandle(protocol)
private[kyuubi] val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
diff --git a/pom.xml b/pom.xml
index 978fd04..cac3c4a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@
<module>dev/kyuubi-codecov</module>
<module>externals/kyuubi-download</module>
<module>externals/kyuubi-flink-sql-engine</module>
+ <module>externals/kyuubi-hive-sql-engine</module>
<module>externals/kyuubi-spark-sql-engine</module>
<module>externals/kyuubi-trino-engine</module>
<module>integration-tests</module>