This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 679aca5a6 [KYUUBI #5867] HiveEngine support run on YARN mode
679aca5a6 is described below
commit 679aca5a6c36494fa57198af16602a3d2ea5e26c
Author: yikaifei <[email protected]>
AuthorDate: Fri Dec 29 18:50:12 2023 +0800
[KYUUBI #5867] HiveEngine support run on YARN mode
# :mag: Description
## Issue References ๐
This PR aims to support hive engine run on yarn mode, close
https://github.com/apache/kyuubi/issues/5867
## Describe Your Solution ๐ง
Please include a summary of the change and which issue is fixed. Please
also include relevant motivation and context. List any dependencies that are
required for this change.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklists
## ๐ Author Self Checklist
- [ ] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [ ] I have performed a self-review
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature
works
- [ ] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested
**Be nice. Be informative.**
Closes #5868 from Yikf/hive-on-yarn.
Closes #5867
44f7287f5 [yikaifei] fix
3c17d2c4a [yikaifei] fix test
5474ebfba [yikaifei] parse classpath
6b97c4213 [Cheng Pan] Update
kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
34a67b452 [Cheng Pan] Update
kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
5e5045e66 [yikaifei] fix app type
d1eb5aea7 [yikaifei] fix
d89d09cfe [Cheng Pan] Update
kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
1fa18ba1b [Cheng Pan] Update
kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala
1b0b77f4d [Cheng Pan] Update
kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala
2ed1d4492 [Cheng Pan] Update
kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
98ff19ce6 [yikaifei] HiveEngine support run on YARN mode
Lead-authored-by: yikaifei <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/configuration/settings.md | 10 +
.../apache/kyuubi/engine/hive/HiveSQLEngine.scala | 9 +-
.../engine/hive/deploy/HiveYarnModeSubmitter.scala | 65 +++
.../hive/deploy/HiveYarnModeSubmitterSuite.scala | 39 ++
integration-tests/kyuubi-hive-it/pom.xml | 31 ++
.../KyuubiOperationHiveEngineYarnModeSuite.scala | 47 +++
.../src/main/scala/org/apache/kyuubi/Utils.scala | 11 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 81 ++++
.../apache/kyuubi/engine/deploy/DeployMode.scala | 34 ++
.../engine/deploy/yarn/ApplicationMaster.scala | 163 ++++++++
.../deploy/yarn/ApplicationMasterArguments.scala | 56 +++
.../deploy/yarn/EngineYarnModeSubmitter.scala | 435 +++++++++++++++++++++
.../org/apache/kyuubi/service/Serverable.scala | 2 +
.../org/apache/kyuubi/util/KyuubiHadoopUtils.scala | 16 +
.../deploy/yarn/EngineYarnModeSubmitterSuite.scala | 76 ++++
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 2 +-
.../kyuubi/engine/KyuubiApplicationManager.scala | 9 +
.../kyuubi/engine/hive/HiveProcessBuilder.scala | 26 +-
.../engine/hive/HiveYarnModeProcessBuilder.scala | 161 ++++++++
.../WithKyuubiServerAndHadoopMiniCluster.scala | 74 ++++
.../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 2 +-
.../hive/HiveYarnModeProcessBuilderSuite.scala | 62 +++
.../org/apache/kyuubi/server/MiniDFSService.scala | 4 +-
.../org/apache/kyuubi/server/MiniYarnService.scala | 12 +-
24 files changed, 1412 insertions(+), 15 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index f0f436127..4e60ea52b 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -148,6 +148,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.flink.initialize.sql | SHOW DATABASES
| The initialize sql for Flink engine. It fallback to
`kyuubi.engine.initialize.sql`.
[...]
| kyuubi.engine.flink.java.options | <undefined>
| The extra Java options for the Flink SQL engine. Only effective in
yarn session mode.
[...]
| kyuubi.engine.flink.memory | 1g
| The heap memory for the Flink SQL engine. Only effective in yarn
session mode.
[...]
+| kyuubi.engine.hive.deploy.mode | LOCAL
| Configures the hive engine deploy mode, The value can be 'local',
'yarn'. In local mode, the engine operates on the same node as the
KyuubiServer. In YARN mode, the engine runs within the Application Master (AM)
container of YARN.
[...]
| kyuubi.engine.hive.event.loggers | JSON
| A comma-separated list of engine history loggers, where
engine/session/operation etc events go.<ul> <li>JSON: the events will be
written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to
be done</li> <li>CUSTOM: to be done.</li></ul>
[...]
| kyuubi.engine.hive.extra.classpath | <undefined>
| The extra classpath for the Hive query engine, for configuring
location of the hadoop client jars and etc.
[...]
| kyuubi.engine.hive.java.options | <undefined>
| The extra Java options for the Hive query engine
[...]
@@ -204,7 +205,16 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.user.isolated.spark.session | true
| When set to false, if the engine is running in a group or server
share level, all the JDBC/ODBC connections will be isolated against the user.
Including the temporary views, function registries, SQL configuration, and the
current database. Note that, it does not affect if the share level is
connection or user.
[...]
| kyuubi.engine.user.isolated.spark.session.idle.interval | PT1M
| The interval to check if the user-isolated Spark session is timeout.
[...]
| kyuubi.engine.user.isolated.spark.session.idle.timeout | PT6H
| If kyuubi.engine.user.isolated.spark.session is false, we will
release the Spark session if its corresponding user is inactive after this
configured timeout.
[...]
+| kyuubi.engine.yarn.app.name | (none)
| The YARN app name when the engine deploy mode is YARN.
[...]
+| kyuubi.engine.yarn.cores | 1
| kyuubi engine container core number when the engine deploy mode is
YARN.
[...]
+| kyuubi.engine.yarn.java.options | (none)
| The extra Java options for the AM when the engine deploy mode is
YARN.
[...]
+| kyuubi.engine.yarn.memory | 1024
| kyuubi engine container memory in mb when the engine deploy mode is
YARN.
[...]
+| kyuubi.engine.yarn.priority | (none)
| kyuubi engine yarn priority when the engine deploy mode is YARN.
[...]
+| kyuubi.engine.yarn.queue | default
| kyuubi engine yarn queue when the engine deploy mode is YARN.
[...]
+| kyuubi.engine.yarn.report.interval | PT1S
| Interval between reports of the current engine on yarn app status.
[...]
+| kyuubi.engine.yarn.stagingDir | (none)
| Staging directory used while submitting kyuubi engine to YARN, It
should be a absolute path in HDFS.
[...]
| kyuubi.engine.yarn.submit.timeout | PT30S
| The engine submit timeout for YARN application.
[...]
+| kyuubi.engine.yarn.tags | (none)
| kyuubi engine yarn tags when the engine deploy mode is YARN.
[...]
### Event
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
index f22e281fb..c489027f4 100644
---
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
@@ -45,7 +45,10 @@ class HiveSQLEngine extends Serverable("HiveSQLEngine") {
super.start()
// Start engine self-terminating checker after all services are ready and
it can be reached by
// all servers in engine spaces.
- backendService.sessionManager.startTerminatingChecker(() => stop())
+ backendService.sessionManager.startTerminatingChecker(() => {
+ selfExist = true
+ stop()
+ })
}
override protected def stopServer(): Unit = {
@@ -151,7 +154,8 @@ object HiveSQLEngine extends Logging {
}
} catch {
- case t: Throwable => currentEngine match {
+ case t: Throwable =>
+ currentEngine match {
case Some(engine) =>
engine.stop()
val event = HiveEngineEvent(engine)
@@ -160,6 +164,7 @@ object HiveSQLEngine extends Logging {
case _ =>
error(s"Failed to start Hive SQL engine: ${t.getMessage}.", t)
}
+ throw t
}
}
}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala
new file mode 100644
index 000000000..9d5126ad6
--- /dev/null
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.hive.deploy
+
+import java.io.File
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_HIVE_EXTRA_CLASSPATH
+import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter
+import org.apache.kyuubi.engine.hive.HiveSQLEngine
+
+object HiveYarnModeSubmitter extends EngineYarnModeSubmitter {
+
+ def main(args: Array[String]): Unit = {
+ Utils.fromCommandLineArgs(args, kyuubiConf)
+ submitApplication()
+ }
+
+ override var engineType: String = "hive"
+
+ override def engineMainClass(): String = HiveSQLEngine.getClass.getName
+
+ /**
+ * Jar list for the Hive engine.
+ */
+ override def engineExtraJars(): Seq[File] = {
+ val hadoopCp = sys.env.get("HIVE_HADOOP_CLASSPATH")
+ val extraCp = kyuubiConf.get(ENGINE_HIVE_EXTRA_CLASSPATH)
+ val jars = new ListBuffer[File]
+ hadoopCp.foreach(cp => parseClasspath(cp, jars))
+ extraCp.foreach(cp => parseClasspath(cp, jars))
+ jars.toSeq
+ }
+
+ private[hive] def parseClasspath(classpath: String, jars: ListBuffer[File]):
Unit = {
+ classpath.split(":").filter(_.nonEmpty).foreach { cp =>
+ if (cp.endsWith("/*")) {
+ val dir = cp.substring(0, cp.length - 2)
+ new File(dir) match {
+ case f if f.isDirectory =>
+ f.listFiles().filter(_.getName.endsWith(".jar")).foreach(jars += _)
+ case _ =>
+ }
+ } else {
+ jars += new File(cp)
+ }
+ }
+ }
+}
diff --git
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitterSuite.scala
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitterSuite.scala
new file mode 100644
index 000000000..9621eb235
--- /dev/null
+++
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitterSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.hive.deploy
+
+import java.io.File
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite,
SCALA_COMPILE_VERSION, Utils}
+
+class HiveYarnModeSubmitterSuite extends KyuubiFunSuite {
+ val hiveEngineHome: String =
Utils.getCodeSourceLocation(getClass).split("/target")(0)
+
+ test("hadoop class path") {
+ val jars = new ListBuffer[File]
+ val classpath =
+ s"$hiveEngineHome/target/scala-$SCALA_COMPILE_VERSION/jars/*:" +
+
s"$hiveEngineHome/target/kyuubi-hive-sql-engine-$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
+ HiveYarnModeSubmitter.parseClasspath(classpath, jars)
+ assert(jars.nonEmpty)
+ assert(jars.exists(
+ _.getName ==
s"kyuubi-hive-sql-engine-$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"))
+ }
+
+}
diff --git a/integration-tests/kyuubi-hive-it/pom.xml
b/integration-tests/kyuubi-hive-it/pom.xml
index c4e9f320c..cdd9fa4d9 100644
--- a/integration-tests/kyuubi-hive-it/pom.xml
+++ b/integration-tests/kyuubi-hive-it/pom.xml
@@ -68,6 +68,37 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <!-- YARN -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-minicluster</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk15on</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.activation</groupId>
+ <artifactId>jakarta.activation-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.xml.bind</groupId>
+ <artifactId>jakarta.xml.bind-api</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEngineYarnModeSuite.scala
b/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEngineYarnModeSuite.scala
new file mode 100644
index 000000000..55943094f
--- /dev/null
+++
b/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEngineYarnModeSuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.it.hive.operation
+
+import org.apache.kyuubi.{HiveEngineTests, Utils,
WithKyuubiServerAndHadoopMiniCluster}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_IDLE_TIMEOUT, ENGINE_TYPE,
KYUUBI_ENGINE_ENV_PREFIX, KYUUBI_HOME}
+import org.apache.kyuubi.engine.deploy.DeployMode
+
+class KyuubiOperationHiveEngineYarnModeSuite extends HiveEngineTests
+ with WithKyuubiServerAndHadoopMiniCluster {
+
+ override protected val conf: KyuubiConf = {
+ val metastore = Utils.createTempDir(prefix = getClass.getSimpleName)
+ metastore.toFile.delete()
+ KyuubiConf()
+ .set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
+ .set(ENGINE_TYPE, "HIVE_SQL")
+ .set(KyuubiConf.ENGINE_HIVE_DEPLOY_MODE, DeployMode.YARN.toString)
+ // increase this to 30s as hive session state and metastore client is
slow initializing
+ .setIfMissing(ENGINE_IDLE_TIMEOUT, 30000L)
+ .set("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastore;create=true")
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ conf
+ .set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MEMORY,
Math.min(getYarnMaximumAllocationMb, 1024))
+ .set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_CORES, 1)
+ }
+
+ override protected def jdbcUrl: String = getJdbcUrl
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 896ed9df2..961a69ad0 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -159,6 +159,17 @@ object Utils extends Logging {
dir
}
+ /**
+ * List the files recursively in a directory.
+ */
+ def listFilesRecursively(file: File): Seq[File] = {
+ if (!file.isDirectory) {
+ file :: Nil
+ } else {
+ file.listFiles().flatMap(listFilesRecursively)
+ }
+ }
+
/**
* Copies bytes from an InputStream source to a newly created temporary file
* created in the directory destination. The temporary file will be created
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 0eee9f47d..926776209 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -30,6 +30,7 @@ import scala.util.matching.Regex
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.{EngineType, ShareLevel}
+import org.apache.kyuubi.engine.deploy.DeployMode
import org.apache.kyuubi.operation.{NoneMode, PlainStyle}
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}
@@ -231,6 +232,7 @@ object KyuubiConf {
final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf"
final val KYUUBI_HOME = "KYUUBI_HOME"
final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv"
+ final val KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX = "kyuubi.engine.yarn.AMEnv"
final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf"
final val KYUUBI_KUBERNETES_CONF_PREFIX = "kyuubi.kubernetes"
final val USER_DEFAULTS_CONF_QUOTE = "___"
@@ -2685,6 +2687,85 @@ object KyuubiConf {
.stringConf
.createOptional
+ val ENGINE_HIVE_DEPLOY_MODE: ConfigEntry[String] =
+ buildConf("kyuubi.engine.hive.deploy.mode")
+ .doc("Configures the hive engine deploy mode, The value can be 'local',
'yarn'. " +
+ "In local mode, the engine operates on the same node as the
KyuubiServer. " +
+ "In YARN mode, the engine runs within the Application Master (AM)
container of YARN. ")
+ .version("1.9.0")
+ .stringConf
+ .transformToUpperCase
+ .checkValue(
+ mode => Set("LOCAL", "YARN").contains(mode),
+ "Invalid value for 'kyuubi.engine.hive.deploy.mode'. Valid values are
'local', 'yarn'.")
+ .createWithDefault(DeployMode.LOCAL.toString)
+
+ val ENGINE_DEPLOY_YARN_MODE_STAGING_DIR: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.yarn.stagingDir")
+ .doc("Staging directory used while submitting kyuubi engine to YARN, " +
+ "It should be a absolute path in HDFS.")
+ .version("1.9.0")
+ .stringConf
+ .createOptional
+
+ val ENGINE_DEPLOY_YARN_MODE_REPORT_INTERVAL: ConfigEntry[Long] =
+ buildConf("kyuubi.engine.yarn.report.interval")
+ .doc("Interval between reports of the current engine on yarn app
status.")
+ .version("1.9.0")
+ .timeConf
+ .checkValue(t => t > 0, "must be positive integer")
+ .createWithDefault(Duration.ofSeconds(1).toMillis)
+
+ val ENGINE_DEPLOY_YARN_MODE_TAGS: OptionalConfigEntry[Seq[String]] =
+ buildConf("kyuubi.engine.yarn.tags")
+ .doc(s"kyuubi engine yarn tags when the engine deploy mode is YARN.")
+ .version("1.9.0")
+ .stringConf
+ .toSequence()
+ .createOptional
+
+ val ENGINE_DEPLOY_YARN_MODE_QUEUE: ConfigEntry[String] =
+ buildConf("kyuubi.engine.yarn.queue")
+ .doc(s"kyuubi engine yarn queue when the engine deploy mode is YARN.")
+ .version("1.9.0")
+ .stringConf
+ .createWithDefault("default")
+
+ val ENGINE_DEPLOY_YARN_MODE_PRIORITY: OptionalConfigEntry[Int] =
+ buildConf("kyuubi.engine.yarn.priority")
+ .doc(s"kyuubi engine yarn priority when the engine deploy mode is YARN.")
+ .version("1.9.0")
+ .intConf
+ .createOptional
+
+ val ENGINE_DEPLOY_YARN_MODE_APP_NAME: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.yarn.app.name")
+ .doc(s"The YARN app name when the engine deploy mode is YARN.")
+ .version("1.9.0")
+ .stringConf
+ .createOptional
+
+ val ENGINE_DEPLOY_YARN_MODE_MEMORY: ConfigEntry[Int] =
+ buildConf("kyuubi.engine.yarn.memory")
+ .doc(s"kyuubi engine container memory in mb when the engine deploy mode
is YARN.")
+ .version("1.9.0")
+ .intConf
+ .createWithDefault(1024)
+
+ val ENGINE_DEPLOY_YARN_MODE_CORES: ConfigEntry[Int] =
+ buildConf("kyuubi.engine.yarn.cores")
+ .doc(s"kyuubi engine container core number when the engine deploy mode
is YARN.")
+ .version("1.9.0")
+ .intConf
+ .createWithDefault(1)
+
+ val ENGINE_DEPLOY_YARN_MODE_JAVA_OPTIONS: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.yarn.java.options")
+ .doc(s"The extra Java options for the AM when the engine deploy mode is
YARN.")
+ .version("1.9.0")
+ .stringConf
+ .createOptional
+
val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.flink.memory")
.doc("The heap memory for the Flink SQL engine. Only effective in yarn
session mode.")
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/DeployMode.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/DeployMode.scala
new file mode 100644
index 000000000..50aa3e4d6
--- /dev/null
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/DeployMode.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.deploy
+
+object DeployMode extends Enumeration {
+ type DeployMode = Value
+ val
+ /**
+ * In this mode, the engine will be launched locally.
+ */
+ LOCAL,
+ /**
+ * In this mode, the engine will be launched on YARN.
+ */
+ YARN,
+ /**
+ * In this mode, the engine will be launched on Kubernetes.
+ */
+ KUBERNETES = Value
+}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala
new file mode 100644
index 000000000..fe6dbbdcf
--- /dev/null
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.deploy.yarn
+
+import java.io.{File, IOException}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.kyuubi.{Logging, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.Serverable
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+import org.apache.kyuubi.util.command.CommandLineUtils.confKeyValues
+import org.apache.kyuubi.util.reflect.{DynFields, DynMethods}
+
+object ApplicationMaster extends Logging {
+
+ private var amClient: AMRMClient[ContainerRequest] = _
+ private var yarnConf: YarnConfiguration = _
+
+ private val kyuubiConf = new KyuubiConf()
+
+ private var currentEngineMainClass: String = _
+
+ private var currentEngine: Serverable = _
+
+ private var finalMsg: String = _
+
+ @volatile private var registered: Boolean = false
+ @volatile private var unregistered: Boolean = false
+ @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
+
+ def main(args: Array[String]): Unit = {
+ try {
+ val amArgs = new ApplicationMasterArguments(args)
+ Utils.getPropertiesFromFile(Some(new
File(amArgs.propertiesFile))).foreach { case (k, v) =>
+ kyuubiConf.set(k, v)
+ }
+ currentEngineMainClass = amArgs.engineMainClass
+ yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
+ Utils.addShutdownHook(() => {
+ if (!unregistered) {
+ if (currentEngine != null && currentEngine.selfExist) {
+ finalMsg = "Kyuubi Application Master is shutting down."
+ finalStatus = FinalApplicationStatus.SUCCEEDED
+ } else {
+ finalMsg = "Kyuubi Application Master is shutting down with error."
+ finalStatus = FinalApplicationStatus.FAILED
+ }
+ cleanupStagingDir()
+ unregister(finalStatus, finalMsg)
+ }
+ })
+ runApplicationMaster()
+ } catch {
+ case t: Throwable =>
+ error("Error running ApplicationMaster", t)
+ finalStatus = FinalApplicationStatus.FAILED
+ finalMsg = t.getMessage
+ cleanupStagingDir()
+ unregister(finalStatus, finalMsg)
+ if (currentEngine != null) {
+ currentEngine.stop()
+ }
+ }
+ }
+
+ def runApplicationMaster(): Unit = {
+ initAmClient()
+
+ runEngine()
+
+ registerAM()
+ }
+
+ def runEngine(): Unit = {
+ val buffer = new ArrayBuffer[String]()
+ buffer ++= confKeyValues(kyuubiConf.getAll)
+
+ val instance = DynFields.builder()
+ .impl(currentEngineMainClass, "MODULE$")
+ .build[Object].get(null)
+ DynMethods.builder("main")
+ .hiddenImpl(currentEngineMainClass, classOf[Array[String]])
+ .buildChecked()
+ .invoke(instance, buffer.toArray)
+
+ currentEngine = DynFields.builder()
+ .hiddenImpl(currentEngineMainClass, "currentEngine")
+ .buildChecked[Option[Serverable]]()
+ .get(instance)
+ .get
+ }
+
+ def initAmClient(): Unit = {
+ amClient = AMRMClient.createAMRMClient()
+ amClient.init(yarnConf)
+ amClient.start()
+ }
+
+ def registerAM(): Unit = {
+ val frontendService = currentEngine.frontendServices.head
+ val trackingUrl = frontendService.connectionUrl
+ val (host, port) = resolveHostAndPort(trackingUrl)
+ info("Registering the HiveSQLEngine ApplicationMaster with tracking url " +
+ s"$trackingUrl, host = $host, port = $port")
+ synchronized {
+ amClient.registerApplicationMaster(host, port, trackingUrl)
+ registered = true
+ }
+ }
+
+ def unregister(status: FinalApplicationStatus, diagnostics: String): Unit = {
+ synchronized {
+ if (registered && !unregistered) {
+ info(s"Unregistering ApplicationMaster with $status" +
+ Option(diagnostics).map(msg => s" (diagnostics message:
$msg)").getOrElse(""))
+ unregistered = true
+ amClient.unregisterApplicationMaster(status, diagnostics, "")
+ if (amClient != null) {
+ amClient.stop()
+ }
+ }
+ }
+ }
+
+ private def resolveHostAndPort(connectionUrl: String): (String, Int) = {
+ val strings = connectionUrl.split(":")
+ (strings(0), strings(1).toInt)
+ }
+
+ private def cleanupStagingDir(): Unit = {
+ val stagingDirPath = new
Path(System.getenv("KYUUBI_ENGINE_YARN_MODE_STAGING_DIR"))
+ try {
+ val fs = stagingDirPath.getFileSystem(yarnConf)
+ info("Deleting staging directory " + stagingDirPath)
+ fs.delete(stagingDirPath, true)
+ } catch {
+ case ioe: IOException =>
+ error("Failed to cleanup staging dir " + stagingDirPath, ioe)
+ }
+ }
+}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMasterArguments.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMasterArguments.scala
new file mode 100644
index 000000000..d4be6afbf
--- /dev/null
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMasterArguments.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.deploy.yarn
+
+import org.apache.kyuubi.Logging
+
+class ApplicationMasterArguments(val args: Array[String]) extends Logging {
+ var engineMainClass: String = null
+ var propertiesFile: String = null
+
+ parseArgs(args.toList)
+
+ private def parseArgs(inputArgs: List[String]): Unit = {
+ var args = inputArgs
+
+ while (args.nonEmpty) {
+ args match {
+ case ("--class") :: value :: tail =>
+ engineMainClass = value
+ args = tail
+
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ args = tail
+
+ case other =>
+ throw new IllegalArgumentException(s"Unrecognized option $other.")
+ }
+ }
+ validateRequiredArguments()
+ }
+
+ private def validateRequiredArguments(): Unit = {
+ if (engineMainClass == null) {
+ throw new IllegalArgumentException("No engine main class provided.")
+ }
+
+ if (propertiesFile == null) {
+ throw new IllegalArgumentException("No properties file provided.")
+ }
+ }
+}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
new file mode 100644
index 000000000..552a3158f
--- /dev/null
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.deploy.yarn
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.util
+import java.util.{Locale, Properties}
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
+import org.apache.hadoop.yarn.util.Records
+
+import org.apache.kyuubi.{KyuubiException, Logging, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter._
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+
+abstract class EngineYarnModeSubmitter extends Logging {
+
+ val KYUUBI_ENGINE_STAGING: String = ".kyuubiEngineStaging"
+
+ /*
+ * The following variables are used to describe the contents of the
+ * ApplicationMaster's working directory. The directory structure is as
follows:
+ *
+ * ApplicationMasterWorkDir/
+ * |-- __kyuubi_engine_conf__
+ * | |-- __hadoop_conf__
+ * | | |-- hadoop conf file1
+ * | | |-- hadoop conf file2
+ * | | `-- ...
+ * | `-- __kyuubi_conf__.properties
+ * `-- __kyuubi_engine_libs__
+ * |-- kyuubi_engine.jar
+ * `-- ...
+ */
+ val LOCALIZED_LIB_DIR = "__kyuubi_engine_libs__"
+ val LOCALIZED_CONF_DIR = "__kyuubi_engine_conf__"
+ val HADOOP_CONF_DIR = "__hadoop_conf__"
+ val KYUUBI_CONF_FILE = "__kyuubi_conf__.properties"
+
+ val STAGING_DIR_PERMISSION: FsPermission =
+ FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
+
+ private val applicationMaster =
ApplicationMaster.getClass.getName.dropRight(1)
+
+ @volatile private var yarnClient: YarnClient = _
+ private var appId: ApplicationId = _
+
+ private[engine] var stagingDirPath: Path = _
+
+ val kyuubiConf = new KyuubiConf()
+
+ var yarnConf: Configuration = _
+ var hadoopConf: Configuration = _
+
+ var engineType: String
+
+ def engineMainClass(): String
+
+ /**
+ * The extra jars that will be added to the classpath of the engine.
+ */
+ def engineExtraJars(): Seq[File] = Seq.empty
+
+ protected def submitApplication(): Unit = {
+ yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
+ hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
+ try {
+ yarnClient = YarnClient.createYarnClient()
+ yarnClient.init(yarnConf)
+ yarnClient.start()
+
+ debug("Requesting a new application from cluster with %d NodeManagers"
+ .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
+
+ val newApp = yarnClient.createApplication()
+ val newAppResponse = newApp.getNewApplicationResponse
+ appId = newAppResponse.getApplicationId
+
+ // The app staging dir based on the STAGING_DIR configuration if
configured
+ // otherwise based on the users home directory.
+ val appStagingBaseDir =
kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_STAGING_DIR)
+ .map { new Path(_,
UserGroupInformation.getCurrentUser.getShortUserName) }
+ .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
+ stagingDirPath = new Path(appStagingBaseDir,
buildPath(KYUUBI_ENGINE_STAGING, appId.toString))
+
+ // Set up the appropriate contexts to launch AM
+ val containerContext = createContainerLaunchContext()
+ val appContext = createApplicationSubmissionContext(newApp,
containerContext)
+
+ // Finally, submit and monitor the application
+ info(s"Submitting application $appId to ResourceManager")
+ yarnClient.submitApplication(appContext)
+ monitorApplication(appId)
+ } catch {
+ case e: Throwable =>
+ if (stagingDirPath != null) {
+ cleanupStagingDir()
+ }
+ throw new KyuubiException("Failed to submit application to YARN", e)
+ } finally {
+ if (yarnClient != null) {
+ yarnClient.stop()
+ }
+ }
+ }
+
+ private def createContainerLaunchContext(): ContainerLaunchContext = {
+ info("Setting up container launch context for engine AM")
+ val env = setupLaunchEnv(kyuubiConf)
+ val localResources = prepareLocalResources(stagingDirPath, env)
+
+ val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+ amContainer.setLocalResources(localResources.asJava)
+ amContainer.setEnvironment(env.asJava)
+
+ val javaOpts = ListBuffer[String]()
+
+ val javaOptions = kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_JAVA_OPTIONS)
+ if (javaOptions.isDefined) {
+ javaOpts += javaOptions.get
+ }
+
+ val am = Seq(applicationMaster)
+
+ val engineClass = Seq("--class", engineMainClass())
+
+ val kyuubiConfProperties = Seq(
+ "--properties-file",
+ buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, KYUUBI_CONF_FILE))
+
+ val commands =
+ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
+ javaOpts ++ am ++ engineClass ++ kyuubiConfProperties ++
+ Seq(
+ "1>",
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+ "2>",
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+ val printableCommands = commands.map(s => if (s == null) "null" else
s).toList
+ amContainer.setCommands(printableCommands.asJava)
+ info(s"Commands: ${printableCommands.mkString(" ")}")
+
+ amContainer
+ }
+
+ private def prepareLocalResources(
+ destDir: Path,
+ env: mutable.HashMap[String, String]): mutable.HashMap[String,
LocalResource] = {
+ info("Preparing resources for engine AM container")
+ // Upload kyuubi engine and the extra JAR to the remote file system if
necessary,
+ // and add them as local resources to the application master.
+ val fs = destDir.getFileSystem(hadoopConf)
+
+ val localResources = mutable.HashMap[String, LocalResource]()
+ FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
+
+ distributeJars(localResources, env)
+ distributeConf(localResources, env)
+ localResources
+ }
+
+ private def distributeJars(
+ localResources: mutable.HashMap[String, LocalResource],
+ env: mutable.HashMap[String, String]): Unit = {
+ val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
Utils.createTempDir().toFile)
+ val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))
+ try {
+ jarsStream.setLevel(0)
+ val jars = kyuubiConf.getOption(KYUUBI_ENGINE_DEPLOY_YARN_MODE_JARS_KEY)
+ val putedEntry = new ListBuffer[String]
+ jars.get.split(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR).foreach
{ path =>
+ val jars = Utils.listFilesRecursively(new File(path)) ++
engineExtraJars()
+ jars.foreach { f =>
+ if (!putedEntry.contains(f.getName) && f.isFile &&
+ f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
+ jarsStream.putNextEntry(new ZipEntry(f.getName))
+ Files.copy(f.toPath, jarsStream)
+ jarsStream.closeEntry()
+ putedEntry += f.getName
+ addClasspathEntry(buildPath(Environment.PWD.$$(),
LOCALIZED_LIB_DIR, f.getName), env)
+ }
+ }
+ }
+ putedEntry.clear()
+ } finally {
+ jarsStream.close()
+ }
+
+ distribute(
+ new Path(jarsArchive.getAbsolutePath),
+ resType = LocalResourceType.ARCHIVE,
+ destName = LOCALIZED_LIB_DIR,
+ localResources)
+ }
+
+ private def distributeConf(
+ localResources: mutable.HashMap[String, LocalResource],
+ env: mutable.HashMap[String, String]): Unit = {
+ val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip",
Utils.createTempDir().toFile)
+ val confStream = new ZipOutputStream(new FileOutputStream(confArchive))
+ try {
+ confStream.setLevel(0)
+ val putedEntry = new ListBuffer[String]
+ def putEntry(f: File): Unit = {
+ if (!putedEntry.contains(f.getName) && f.isFile && f.canRead) {
+ confStream.putNextEntry(new
ZipEntry(s"$HADOOP_CONF_DIR/${f.getName}"))
+ Files.copy(f.toPath, confStream)
+ confStream.closeEntry()
+ putedEntry += f.getName
+ addClasspathEntry(
+ buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR,
HADOOP_CONF_DIR, f.getName),
+ env)
+ }
+ }
+ // respect the following priority loading configuration, and distinct
files
+ // hive configuration -> hadoop configuration -> yarn configuration
+ val hiveConf =
kyuubiConf.getOption(KYUUBI_ENGINE_DEPLOY_YARN_MODE_HIVE_CONF_KEY)
+ listDistinctFiles(hiveConf.get).foreach(putEntry)
+ val hadoopConf =
kyuubiConf.getOption(KYUUBI_ENGINE_DEPLOY_YARN_MODE_HADOOP_CONF_KEY)
+ listDistinctFiles(hadoopConf.get).foreach(putEntry)
+ val yarnConf =
kyuubiConf.getOption(KYUUBI_ENGINE_DEPLOY_YARN_MODE_YARN_CONF_KEY)
+ listDistinctFiles(yarnConf.get).foreach(putEntry)
+
+ val properties = confToProperties(kyuubiConf)
+ writePropertiesToArchive(properties, KYUUBI_CONF_FILE, confStream)
+ } finally {
+ confStream.close()
+ }
+
+ distribute(
+ new Path(confArchive.getAbsolutePath),
+ resType = LocalResourceType.ARCHIVE,
+ destName = LOCALIZED_CONF_DIR,
+ localResources)
+ }
+
+ def listDistinctFiles(archive: String): Seq[File] = {
+ val distinctFiles = new mutable.LinkedHashSet[File]
+ archive.split(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR).foreach {
path =>
+ val file = new File(path)
+ val files = Utils.listFilesRecursively(file)
+ files.foreach { f =>
+ if (f.isFile && f.canRead) {
+ distinctFiles += f
+ }
+ }
+ }
+ distinctFiles.groupBy(_.getName).map {
+ case (_, items) => items.head
+ }.toSeq
+ }
+
+ private def distribute(
+ srcPath: Path,
+ resType: LocalResourceType,
+ destName: String,
+ localResources: mutable.HashMap[String, LocalResource]): Unit = {
+ val fs = stagingDirPath.getFileSystem(hadoopConf)
+ val destPath = new Path(stagingDirPath, srcPath.getName)
+ info(s"Copying $srcPath to $destPath")
+ fs.copyFromLocalFile(srcPath, destPath)
+ fs.setPermission(destPath, new FsPermission(STAGING_DIR_PERMISSION))
+
+ val destFs = FileSystem.get(destPath.toUri, hadoopConf)
+ val destStatus = destFs.getFileStatus(destPath)
+
+ val destResource = Records.newRecord(classOf[LocalResource])
+ destResource.setType(resType)
+ destResource.setVisibility(LocalResourceVisibility.APPLICATION)
+ destResource.setResource(URL.fromPath(destPath))
+ destResource.setTimestamp(destStatus.getModificationTime)
+ destResource.setSize(destStatus.getLen)
+ localResources(destName) = destResource
+ }
+
+ private[kyuubi] def setupLaunchEnv(kyuubiConf: KyuubiConf):
mutable.HashMap[String, String] = {
+ info("Setting up the launch environment for engine AM container")
+ val env = new mutable.HashMap[String, String]()
+
+ kyuubiConf.getAll
+ .filter { case (k, _) =>
k.startsWith(KyuubiConf.KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX) }
+ .map { case (k, v) =>
+ (k.substring(KyuubiConf.KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX.length +
1), v)
+ }
+ .foreach { case (k, v) => KyuubiHadoopUtils.addPathToEnvironment(env, k,
v) }
+
+ addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR), env)
+ env.put(
+ Environment.HADOOP_CONF_DIR.name(),
+ buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, HADOOP_CONF_DIR))
+ addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR,
HADOOP_CONF_DIR), env)
+ env.put("KYUUBI_ENGINE_YARN_MODE_STAGING_DIR", stagingDirPath.toString)
+ env
+ }
+
+ private def createApplicationSubmissionContext(
+ newApp: YarnClientApplication,
+ containerContext: ContainerLaunchContext): ApplicationSubmissionContext
= {
+
+ val appContext = newApp.getApplicationSubmissionContext
+
appContext.setApplicationName(kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_APP_NAME)
+ .getOrElse(s"Apache Kyuubi $engineType Engine"))
+ appContext.setQueue(kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_QUEUE))
+ appContext.setAMContainerSpec(containerContext)
+ kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_PRIORITY).foreach { appPriority =>
+ appContext.setPriority(Priority.newInstance(appPriority))
+ }
+ appContext.setApplicationType(engineType.toUpperCase(Locale.ROOT))
+
+ val allTags = new util.HashSet[String]
+ kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_TAGS).foreach { tags =>
+ allTags.addAll(tags.asJava)
+ }
+ appContext.setApplicationTags(allTags)
+ appContext.setMaxAppAttempts(1)
+
+ val capability = Records.newRecord(classOf[Resource])
+ capability.setMemorySize(kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_MEMORY))
+ capability.setVirtualCores(kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_CORES))
+ debug(s"Created resource capability for AM request: $capability")
+ appContext.setResource(capability)
+
+ appContext
+ }
+
+ private def monitorApplication(appId: ApplicationId): Unit = {
+ val report = yarnClient.getApplicationReport(appId)
+ val state = report.getYarnApplicationState
+ info(s"Application report for $appId (state: $state)")
+ if (state == YarnApplicationState.FAILED || state ==
YarnApplicationState.KILLED) {
+ throw new KyuubiException(s"Application $appId finished with status:
$state")
+ }
+ }
+
+ private def cleanupStagingDir(): Unit = {
+ try {
+ val fs = stagingDirPath.getFileSystem(hadoopConf)
+ if (fs.delete(stagingDirPath, true)) {
+ info(s"Deleted staging directory $stagingDirPath")
+ }
+ } catch {
+ case ioe: IOException =>
+ warn("Failed to cleanup staging dir " + stagingDirPath, ioe)
+ }
+ }
+
+ /**
+ * Joins all the path components using Path.SEPARATOR.
+ */
+ private def buildPath(components: String*): String = {
+ components.mkString(Path.SEPARATOR)
+ }
+
+ /**
+ * Add the given path to the classpath entry of the given environment map.
+ * If the classpath is already set, this appends the new path to the
existing classpath.
+ */
+ private def addClasspathEntry(path: String, env: mutable.HashMap[String,
String]): Unit =
+ KyuubiHadoopUtils.addPathToEnvironment(env, Environment.CLASSPATH.name,
path)
+
+ private def confToProperties(conf: KyuubiConf): Properties = {
+ val props = new Properties()
+ conf.getAll.foreach { case (k, v) =>
+ props.setProperty(k, v)
+ }
+ props
+ }
+
+ def writePropertiesToArchive(props: Properties, name: String, out:
ZipOutputStream): Unit = {
+ out.putNextEntry(new ZipEntry(name))
+ val writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)
+ props.store(writer, "Kyuubi configuration.")
+ writer.flush()
+ out.closeEntry()
+ }
+
+ def writeConfigurationToArchive(
+ conf: Configuration,
+ name: String,
+ out: ZipOutputStream): Unit = {
+ out.putNextEntry(new ZipEntry(name))
+ val writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)
+ conf.writeXml(writer)
+ writer.flush()
+ out.closeEntry()
+ }
+}
+
+object EngineYarnModeSubmitter {
+ final val KYUUBI_ENGINE_DEPLOY_YARN_MODE_JARS_KEY =
"kyuubi.engine.deploy.yarn.mode.jars"
+ final val KYUUBI_ENGINE_DEPLOY_YARN_MODE_HIVE_CONF_KEY =
+ "kyuubi.engine.deploy.yarn.mode.hiveConf"
+ final val KYUUBI_ENGINE_DEPLOY_YARN_MODE_HADOOP_CONF_KEY =
+ "kyuubi.engine.deploy.yarn.mode.hadoopConf"
+ final val KYUUBI_ENGINE_DEPLOY_YARN_MODE_YARN_CONF_KEY =
+ "kyuubi.engine.deploy.yarn.mode.yarnConf"
+
+ final val KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR = ","
+}
+
+case class YarnAppReport(
+ appState: YarnApplicationState,
+ finalState: FinalApplicationStatus,
+ diagnostics: Option[String])
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
index 05ed3644c..84ec32b15 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
@@ -35,6 +35,8 @@ abstract class Serverable(name: String) extends
CompositeService(name) {
private val started = new AtomicBoolean(false)
+ var selfExist = false
+
val backendService: AbstractBackendService
val frontendServices: Seq[AbstractFrontendService]
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index 4959c845d..2d9ea4a8a 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream,
DataInputStream, Da
import java.util.{Base64, Map => JMap}
import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
@@ -29,6 +30,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, SecurityUtil}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.kyuubi.Logging
@@ -98,4 +100,18 @@ object KyuubiHadoopUtils extends Logging {
None
}
}
+
+ /**
+ * Add a path variable to the given environment map.
+ * If the map already contains this key, append the value to the existing
value instead.
+ */
+ def addPathToEnvironment(env: HashMap[String, String], key: String, value:
String): Unit = {
+ val newValue =
+ if (env.contains(key)) {
+ env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR + value
+ } else {
+ value
+ }
+ env.put(key, newValue)
+ }
}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitterSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitterSuite.scala
new file mode 100644
index 000000000..349c194e6
--- /dev/null
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitterSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.deploy.yarn
+
+import java.io.File
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.scalatest.matchers.must.Matchers
+import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import
org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter.KYUUBI_ENGINE_DEPLOY_YARN_MODE_JARS_KEY
+
+class EngineYarnModeSubmitterSuite extends KyuubiFunSuite with Matchers {
+
+ val kyuubiHome: String =
Utils.getCodeSourceLocation(getClass).split("kyuubi-common").head
+
+ test("Classpath should contain engine jars dir and conf dir") {
+ val kyuubiConf = new KyuubiConf()
+ .set(KYUUBI_ENGINE_DEPLOY_YARN_MODE_JARS_KEY, "mock.jar")
+
+ val env = MockEngineYarnModeSubmitter.setupLaunchEnv(kyuubiConf)
+ assert(env.contains(Environment.HADOOP_CONF_DIR.name()))
+
+ val cp = env("CLASSPATH").split(":|;|<CPS>")
+
+ assert(cp.length == 2)
+ cp should contain("{{PWD}}/__kyuubi_engine_conf__")
+ cp should contain("{{PWD}}/__kyuubi_engine_conf__/__hadoop_conf__")
+ }
+
+ test("container env should contain engine env") {
+ val kyuubiConf = new KyuubiConf()
+ .set(s"${KyuubiConf.KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX}.KYUUBI_HOME",
kyuubiHome)
+
+ val env = MockEngineYarnModeSubmitter.setupLaunchEnv(kyuubiConf)
+ assert(env.nonEmpty)
+ assert(env.contains("KYUUBI_HOME"))
+ assert(env("KYUUBI_HOME") == kyuubiHome)
+ }
+
+ test("distinct archive files") {
+ val targetJars: String = s"${Utils.getCodeSourceLocation(getClass)}"
+ // double the jars to make sure the distinct works
+ val archives = s"$targetJars,$targetJars"
+ val files = MockEngineYarnModeSubmitter.listDistinctFiles(archives)
+ val targetFiles = Utils.listFilesRecursively(new File(targetJars))
+ assert(targetFiles != null)
+ assert(targetFiles.length == files.length)
+ }
+
+}
+
+object MockEngineYarnModeSubmitter extends EngineYarnModeSubmitter {
+ override var engineType: String = "mock"
+
+ stagingDirPath = new Path("target/test-staging-dir")
+
+ override def engineMainClass(): String =
"org.apache.kyuubi.engine.deploy.Mock"
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index e15645133..bd1a4b640 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -196,7 +196,7 @@ private[kyuubi] class EngineRef(
new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case HIVE_SQL =>
conf.setIfMissing(HiveProcessBuilder.HIVE_ENGINE_NAME,
defaultEngineName)
- new HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
+ HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog,
defaultEngineName)
case JDBC =>
new JdbcProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case CHAT =>
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index f8b640053..1afdcc3cf 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -103,6 +103,13 @@ object KyuubiApplicationManager {
conf.set(SparkProcessBuilder.TAG_KEY, newTag)
}
+ private def setupEngineYarnModeTag(tag: String, conf: KyuubiConf): Unit = {
+ val originalTag =
+ conf.getOption(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_TAGS.key).map(_ +
",").getOrElse("")
+ val newTag = s"${originalTag}KYUUBI" +
Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("")
+ conf.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_TAGS.key, newTag)
+ }
+
private def setupSparkK8sTag(tag: String, conf: KyuubiConf): Unit = {
conf.set("spark.kubernetes.driver.label." + LABEL_KYUUBI_UNIQUE_KEY, tag)
}
@@ -182,6 +189,8 @@ object KyuubiApplicationManager {
// running flink on other platforms is not yet supported
setupFlinkYarnTag(applicationTag, conf)
// other engine types are running locally yet
+ case ("HIVE", Some("YARN")) =>
+ setupEngineYarnModeTag(applicationTag, conf)
case _ =>
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
index d8e4454b6..2d4145ff5 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
@@ -26,10 +26,12 @@ import com.google.common.annotations.VisibleForTesting
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_HIVE_EXTRA_CLASSPATH,
ENGINE_HIVE_JAVA_OPTIONS, ENGINE_HIVE_MEMORY}
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_DEPLOY_YARN_MODE_APP_NAME,
ENGINE_HIVE_DEPLOY_MODE, ENGINE_HIVE_EXTRA_CLASSPATH, ENGINE_HIVE_JAVA_OPTIONS,
ENGINE_HIVE_MEMORY}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID,
KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
-import org.apache.kyuubi.engine.hive.HiveProcessBuilder._
+import org.apache.kyuubi.engine.deploy.DeployMode
+import org.apache.kyuubi.engine.deploy.DeployMode.{LOCAL, YARN}
+import
org.apache.kyuubi.engine.hive.HiveProcessBuilder.HIVE_HADOOP_CLASSPATH_KEY
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.command.CommandLineUtils._
@@ -45,7 +47,7 @@ class HiveProcessBuilder(
this(proxyUser, conf, "")
}
- private val hiveHome: String = getEngineHome(shortName)
+ protected val hiveHome: String = getEngineHome(shortName)
override protected def module: String = "kyuubi-hive-sql-engine"
@@ -113,7 +115,23 @@ class HiveProcessBuilder(
override def shortName: String = "hive"
}
-object HiveProcessBuilder {
+object HiveProcessBuilder extends Logging {
final val HIVE_HADOOP_CLASSPATH_KEY = "HIVE_HADOOP_CLASSPATH"
final val HIVE_ENGINE_NAME = "hive.engine.name"
+
+ def apply(
+ appUser: String,
+ conf: KyuubiConf,
+ engineRefId: String,
+ extraEngineLog: Option[OperationLog],
+ defaultEngineName: String): HiveProcessBuilder = {
+ DeployMode.withName(conf.get(ENGINE_HIVE_DEPLOY_MODE)) match {
+ case LOCAL => new HiveProcessBuilder(appUser, conf, engineRefId,
extraEngineLog)
+ case YARN =>
+ warn(s"Hive on YARN model is experimental.")
+ conf.setIfMissing(ENGINE_DEPLOY_YARN_MODE_APP_NAME,
Some(defaultEngineName))
+ new HiveYarnModeProcessBuilder(appUser, conf, engineRefId,
extraEngineLog)
+ case other => throw new KyuubiException(s"Unsupported deploy mode:
$other")
+ }
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilder.scala
new file mode 100644
index 000000000..ba842cbd4
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilder.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.hive
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kyuubi.{KyuubiException, Logging, SCALA_COMPILE_VERSION}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_HIVE_EXTRA_CLASSPATH,
ENGINE_HIVE_MEMORY}
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID,
KYUUBI_SESSION_USER_KEY}
+import org.apache.kyuubi.engine.{ApplicationManagerInfo,
KyuubiApplicationManager}
+import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter._
+import
org.apache.kyuubi.engine.hive.HiveProcessBuilder.HIVE_HADOOP_CLASSPATH_KEY
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.command.CommandLineUtils.{confKeyValue,
confKeyValues}
+
+/**
+ * A process builder for Hive on Yarn.
+ *
+ * It will new a process on kyuubi server side to submit hive engine to yarn.
+ */
+class HiveYarnModeProcessBuilder(
+ override val proxyUser: String,
+ override val conf: KyuubiConf,
+ override val engineRefId: String,
+ override val extraEngineLog: Option[OperationLog] = None)
+ extends HiveProcessBuilder(proxyUser, conf, engineRefId, extraEngineLog)
with Logging {
+
+ override protected def mainClass: String =
+ "org.apache.kyuubi.engine.hive.deploy.HiveYarnModeSubmitter"
+
+ override def isClusterMode(): Boolean = true
+
+ override def clusterManager(): Option[String] = Some("yarn")
+
+ override def appMgrInfo(): ApplicationManagerInfo =
ApplicationManagerInfo(clusterManager())
+
+ override protected val commands: Iterable[String] = {
+ KyuubiApplicationManager.tagApplication(engineRefId, shortName,
clusterManager(), conf)
+ val buffer = new ArrayBuffer[String]()
+ buffer += executable
+
+ val memory = conf.get(ENGINE_HIVE_MEMORY)
+ buffer += s"-Xmx$memory"
+ buffer += "-cp"
+
+ val classpathEntries = new util.LinkedHashSet[String]
+ classpathEntries.addAll(hiveConfFiles())
+ classpathEntries.addAll(hadoopConfFiles())
+ classpathEntries.addAll(yarnConfFiles())
+ classpathEntries.addAll(jarFiles(true))
+
+ buffer += classpathEntries.asScala.mkString(File.pathSeparator)
+ buffer += mainClass
+
+ buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)
+ buffer ++= confKeyValue(KYUUBI_ENGINE_ID, engineRefId)
+
+ buffer ++= confKeyValue(
+ KYUUBI_ENGINE_DEPLOY_YARN_MODE_JARS_KEY,
+
jarFiles(false).asScala.mkString(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR))
+
+ buffer ++= confKeyValue(
+ KYUUBI_ENGINE_DEPLOY_YARN_MODE_HIVE_CONF_KEY,
+
hiveConfFiles().asScala.mkString(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR))
+ buffer ++= confKeyValue(
+ KYUUBI_ENGINE_DEPLOY_YARN_MODE_HADOOP_CONF_KEY,
+
hadoopConfFiles().asScala.mkString(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR))
+ buffer ++= confKeyValue(
+ KYUUBI_ENGINE_DEPLOY_YARN_MODE_YARN_CONF_KEY,
+
yarnConfFiles().asScala.mkString(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR))
+
+ buffer ++= confKeyValues(conf.getAll)
+
+ buffer
+ }
+
+ private def jarFiles(isClasspath: Boolean): util.LinkedHashSet[String] = {
+ val jarEntries = new util.LinkedHashSet[String]
+
+ mainResource.foreach(jarEntries.add)
+
+
jarEntries.add(s"$hiveHome${File.separator}lib${appendClasspathSuffix(isClasspath)}")
+
+ val hadoopCp = env.get(HIVE_HADOOP_CLASSPATH_KEY)
+ val extraCp = conf.get(ENGINE_HIVE_EXTRA_CLASSPATH)
+ // the classpath of the ApplicationMaster is resolved when submit hive
engine to YARN.
+ if (isClasspath) {
+ extraCp.foreach(jarEntries.add)
+ hadoopCp.foreach(jarEntries.add)
+ }
+ if (hadoopCp.isEmpty && extraCp.isEmpty) {
+ warn(s"The conf of ${HIVE_HADOOP_CLASSPATH_KEY} and
${ENGINE_HIVE_EXTRA_CLASSPATH.key}" +
+ s" is empty.")
+ debug("Detected development environment")
+ mainResource.foreach { path =>
+ val devHadoopJars = Paths.get(path).getParent
+ .resolve(s"scala-$SCALA_COMPILE_VERSION")
+ .resolve("jars")
+ if (!Files.exists(devHadoopJars)) {
+ throw new KyuubiException(s"The path $devHadoopJars does not exists.
" +
+ s"Please set ${HIVE_HADOOP_CLASSPATH_KEY} or
${ENGINE_HIVE_EXTRA_CLASSPATH.key} for " +
+ s"configuring location of hadoop client jars, etc")
+ }
+ jarEntries.add(s"$devHadoopJars${appendClasspathSuffix(isClasspath)}")
+ }
+ }
+
+ jarEntries
+ }
+
+ private def hiveConfFiles(): util.LinkedHashSet[String] = {
+ val confEntries = new util.LinkedHashSet[String]
+ confEntries.add(env.getOrElse(
+ "HIVE_CONF_DIR",
+ s"$hiveHome${File.separator}conf"))
+
+ confEntries
+ }
+
+ private def hadoopConfFiles(): util.LinkedHashSet[String] = {
+ val confEntries = new util.LinkedHashSet[String]
+ env.get("HADOOP_CONF_DIR").foreach(confEntries.add)
+
+ confEntries
+ }
+
+ private def yarnConfFiles(): util.LinkedHashSet[String] = {
+ val confEntries = new util.LinkedHashSet[String]
+ env.get("YARN_CONF_DIR").foreach(confEntries.add)
+
+ confEntries
+ }
+
+ private def appendClasspathSuffix(isClasspath: Boolean): String = {
+ if (isClasspath) {
+ s"${File.separator}*"
+ } else {
+ ""
+ }
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerAndHadoopMiniCluster.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerAndHadoopMiniCluster.scala
new file mode 100644
index 000000000..bd11de08d
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerAndHadoopMiniCluster.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi
+
+import java.io.File
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
+import org.apache.kyuubi.server.{MiniDFSService, MiniYarnService}
+
+trait WithKyuubiServerAndHadoopMiniCluster extends KyuubiFunSuite with
WithKyuubiServer {
+
+ val kyuubiHome: String =
Utils.getCodeSourceLocation(getClass).split("integration-tests").head
+
+ override protected val conf: KyuubiConf = new KyuubiConf(false)
+
+ private val hadoopConfDir: File = Utils.createTempDir().toFile
+
+ protected var miniHdfsService: MiniDFSService = _
+
+ protected var miniYarnService: MiniYarnService = _
+
+ override def beforeAll(): Unit = {
+ miniHdfsService = new MiniDFSService()
+ miniHdfsService.initialize(conf)
+ miniHdfsService.start()
+
+ miniYarnService = new MiniYarnService()
+ miniYarnService.initialize(conf)
+ miniYarnService.start()
+
+ miniHdfsService.saveHadoopConf(hadoopConfDir)
+ miniYarnService.saveYarnConf(hadoopConfDir)
+
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.KYUUBI_HOME", kyuubiHome)
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR",
hadoopConfDir.getAbsolutePath)
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.YARN_CONF_DIR",
hadoopConfDir.getAbsolutePath)
+
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ if (miniYarnService != null) {
+ miniYarnService.stop()
+ miniYarnService = null
+ }
+ if (miniHdfsService != null) {
+ miniHdfsService.stop()
+ miniHdfsService = null
+ }
+ }
+
+ def getYarnMaximumAllocationMb: Int = {
+ require(miniYarnService != null, "MiniYarnService is not initialized")
+
miniYarnService.getYarnConf.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
1024)
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 5a674d98f..8d3f7b17d 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -54,7 +54,7 @@ sealed trait WithKyuubiServerOnYarn extends WithKyuubiServer {
miniYarnService = new MiniYarnService()
miniYarnService.initialize(conf)
miniYarnService.start()
- conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR",
miniYarnService.getHadoopConfDir)
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR",
miniYarnService.getYarnConfDir)
super.beforeAll()
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilderSuite.scala
new file mode 100644
index 000000000..7c896309c
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilderSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.hive
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import
org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter.{KYUUBI_ENGINE_DEPLOY_YARN_MODE_HADOOP_CONF_KEY,
KYUUBI_ENGINE_DEPLOY_YARN_MODE_HIVE_CONF_KEY,
KYUUBI_ENGINE_DEPLOY_YARN_MODE_YARN_CONF_KEY}
+import
org.apache.kyuubi.engine.hive.HiveProcessBuilder.HIVE_HADOOP_CLASSPATH_KEY
+
+class HiveYarnModeProcessBuilderSuite extends KyuubiFunSuite {
+
+ test("hive yarn mode process builder") {
+ val conf = KyuubiConf().set("kyuubi.on", "off")
+ val builder = new HiveYarnModeProcessBuilder("kyuubi", conf, "") {
+ override def env: Map[String, String] =
+ super.env + ("HIVE_CONF_DIR" -> "/etc/hive/conf") +
(HIVE_HADOOP_CLASSPATH_KEY -> "/hadoop")
+ }
+ val commands = builder.toString.split('\n')
+ assert(commands.head.contains("bin/java"), "wrong exec")
+ assert(builder.toString.contains("--conf kyuubi.session.user=kyuubi"))
+ assert(commands.exists(ss => ss.contains("kyuubi-hive-sql-engine")),
"wrong classpath")
+ assert(builder.toString.contains("--conf kyuubi.on=off"))
+ assert(builder.toString.contains(
+ s"--conf $KYUUBI_ENGINE_DEPLOY_YARN_MODE_HIVE_CONF_KEY=/etc/hive/conf"))
+ }
+
+ test("hadoop conf dir") {
+ val conf = KyuubiConf().set("kyuubi.on", "off")
+ val builder = new HiveYarnModeProcessBuilder("kyuubi", conf, "") {
+ override def env: Map[String, String] =
+ super.env + ("HADOOP_CONF_DIR" -> "/etc/hadoop/conf") +
+ (HIVE_HADOOP_CLASSPATH_KEY -> "/hadoop")
+ }
+ assert(builder.toString.contains(
+ s"--conf
$KYUUBI_ENGINE_DEPLOY_YARN_MODE_HADOOP_CONF_KEY=/etc/hadoop/conf"))
+ }
+
+ test("yarn conf dir") {
+ val conf = KyuubiConf().set("kyuubi.on", "off")
+ val builder = new HiveYarnModeProcessBuilder("kyuubi", conf, "") {
+ override def env: Map[String, String] =
+ super.env + ("YARN_CONF_DIR" -> "/etc/hadoop/yarn/conf") +
+ (HIVE_HADOOP_CLASSPATH_KEY -> "/hadoop")
+ }
+ assert(builder.toString.contains(
+ s"--conf
$KYUUBI_ENGINE_DEPLOY_YARN_MODE_YARN_CONF_KEY=/etc/hadoop/yarn/conf"))
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
index caacbb6bf..dbc20be87 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
@@ -60,7 +60,7 @@ class MiniDFSService(name: String, hdfsConf: Configuration)
s"NameNode address in configuration is " +
s"${hdfsConf.get(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)}")
super.start()
- saveHadoopConf()
+ saveHadoopConf(hadoopConfDir)
}
override def stop(): Unit = {
@@ -68,7 +68,7 @@ class MiniDFSService(name: String, hdfsConf: Configuration)
super.stop()
}
- private def saveHadoopConf(): Unit = {
+ def saveHadoopConf(hadoopConfDir: File): Unit = {
val configToWrite = new Configuration(false)
val hostName = InetAddress.getLocalHost.getHostName
hdfsConf.iterator().asScala.foreach { kv =>
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
index 68a175efc..deaeae3be 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
@@ -33,7 +33,7 @@ import org.apache.kyuubi.service.AbstractService
class MiniYarnService extends AbstractService("TestMiniYarnService") {
- private val hadoopConfDir: File = Utils.createTempDir().toFile
+ private val yarnConfDir: File = Utils.createTempDir().toFile
private var yarnConf: YarnConfiguration = {
val yarnConfig = new YarnConfiguration()
// Disable the disk utilization check to avoid the test hanging when
people's disks are
@@ -82,7 +82,7 @@ class MiniYarnService extends
AbstractService("TestMiniYarnService") {
override def start(): Unit = {
yarnCluster.start()
- saveHadoopConf()
+ saveYarnConf(yarnConfDir)
super.start()
}
@@ -91,7 +91,7 @@ class MiniYarnService extends
AbstractService("TestMiniYarnService") {
super.stop()
}
- private def saveHadoopConf(): Unit = {
+ def saveYarnConf(yarnConfDir: File): Unit = {
val configToWrite = new Configuration(false)
val hostName = InetAddress.getLocalHost.getHostName
yarnCluster.getConfig.iterator().asScala.foreach { kv =>
@@ -100,10 +100,12 @@ class MiniYarnService extends
AbstractService("TestMiniYarnService") {
configToWrite.set(key, value)
getConf.set(key, value)
}
- val writer = new FileWriter(new File(hadoopConfDir, "yarn-site.xml"))
+ val writer = new FileWriter(new File(yarnConfDir, "yarn-site.xml"))
configToWrite.writeXml(writer)
writer.close()
}
- def getHadoopConfDir: String = hadoopConfDir.getAbsolutePath
+ def getYarnConfDir: String = yarnConfDir.getAbsolutePath
+
+ def getYarnConf: YarnConfiguration = yarnConf
}