This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5e6d645e3 [KYUUBI #2445] Implement ApplicationManager and Yarn/
JPS-local Application Operation
5e6d645e3 is described below
commit 5e6d645e3e99034692e461f953631124bee381e6
Author: Kent Yao <[email protected]>
AuthorDate: Mon Apr 25 10:11:08 2022 +0800
[KYUUBI #2445] Implement ApplicationManager and Yarn/ JPS-local Application
Operation
### _Why are the changes needed?_
Add KyuubiApplicationManager in SessionManager for application management,
currently support kill and get application information.
The underlying cluster manager operation added in this PR are
- local jps with SIG TERM KILL
- YARN-client
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2447 from yaooqinn/2445.
Closes #2445
77810390 [Kent Yao] address comment
aed3f251 [Kent Yao] address comment
0fad9419 [Kent Yao] address comment
67ec2500 [Kent Yao] address comment
800bedb5 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/
JPS-local Application Operation
1a4084ab [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/
JPS-local Application Operation
be58583a [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/
JPS-local Application Operation
e75e20e7 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/
JPS-local Application Operation
baac7f04 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/
JPS-local Application Operation
e3f5c29a [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/
JPS-local Application Operation
c81e5635 [Kent Yao] [KYUUBI #2445] Implement ApplicationManager and Yarn/
JPS-local Application Operation
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../org/apache/kyuubi/util/KyuubiHadoopUtils.scala | 5 +
.../org.apache.kyuubi.engine.ApplicationOperation | 19 +++
.../kyuubi/engine/ApplicationOperation.scala | 75 ++++++++++++
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 30 ++---
.../kyuubi/engine/JpsApplicationOperation.scala | 92 +++++++++++++++
.../kyuubi/engine/KyuubiApplicationManager.scala | 129 +++++++++++++++++++++
.../org/apache/kyuubi/engine/ProcBuilder.scala | 12 +-
.../kyuubi/engine/YarnApplicationOperation.scala | 107 +++++++++++++++++
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 33 +-----
.../kyuubi/engine/hive/HiveProcessBuilder.scala | 2 +-
.../scala/org/apache/kyuubi/engine/package.scala | 26 +++++
.../engine/spark/SparkBatchProcessBuilder.scala | 24 +---
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 65 +++--------
.../kyuubi/engine/trino/TrinoProcessBuilder.scala | 2 +-
.../kyuubi/operation/BatchJobSubmission.scala | 95 ++++++---------
.../kyuubi/session/KyuubiBatchSessionImpl.scala | 1 -
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 3 +-
.../kyuubi/session/KyuubiSessionManager.scala | 3 +
.../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 24 ++--
.../org/apache/kyuubi/engine/EngineRefSuite.scala | 30 ++---
.../engine/JpsApplicationOperationSuite.scala | 98 ++++++++++++++++
.../engine/flink/FlinkProcessBuilderSuite.scala | 21 ----
.../spark/SparkBatchProcessBuilderSuite.scala | 78 -------------
.../spark/SparkProcessBuilderOnYarnSuite.scala | 71 ------------
.../operation/KyuubiBatchYarnClusterSuite.scala | 53 ++++++---
.../KyuubiOperationYarnClusterSuite.scala | 11 --
.../org/apache/kyuubi/server/MiniYarnService.scala | 35 ++----
27 files changed, 704 insertions(+), 440 deletions(-)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index 85a7d65cb..5ce2c08d7 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -27,6 +27,7 @@ import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, SecurityUtil,
UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.kyuubi.config.KyuubiConf
@@ -49,6 +50,10 @@ object KyuubiHadoopUtils {
hadoopConf
}
+ def newYarnConfiguration(conf: KyuubiConf): YarnConfiguration = {
+ new YarnConfiguration(newHadoopConf(conf))
+ }
+
def getServerPrincipal(principal: String): String = {
SecurityUtil.getServerPrincipal(principal, "0.0.0.0")
}
diff --git
a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.engine.ApplicationOperation
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.engine.ApplicationOperation
new file mode 100644
index 000000000..90fa759e8
--- /dev/null
+++
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.engine.ApplicationOperation
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.kyuubi.engine.YarnApplicationOperation
+org.apache.kyuubi.engine.JpsApplicationOperation
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
new file mode 100644
index 000000000..fc083a336
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import org.apache.kyuubi.config.KyuubiConf
+
+trait ApplicationOperation {
+
+ /**
+ * Step for initializing the instance.
+ */
+ def initialize(conf: KyuubiConf): Unit
+
+ /**
+ * Step to clean up the instance
+ */
+ def stop(): Unit
+
+ /**
+ * Called before other method to do a quick skip
+ *
+ * @param clusterManager the underlying cluster manager or just local
instance
+ */
+ def isSupported(clusterManager: Option[String]): Boolean
+
+ /**
+ * Kill the app/engine by the unique application tag
+ *
+ * @param tag the unique application tag for engine instance.
+ * For example,
+ * if the Hadoop Yarn is used, for spark applications,
+ * the tag will be preset via spark.yarn.tags
+ * @return a message contains response describing how the kill process.
+ *
+ * @note For implementations, please suppress exceptions and always return
KillResponse
+ */
+ def killApplicationByTag(tag: String): KillResponse
+
+ /**
+ * Get the engine/application status by the unique application tag
+ *
+ * @param tag the unique application tag for engine instance.
+ * @return a map contains the application status
+ */
+ def getApplicationInfoByTag(tag: String): Map[String, String]
+}
+
+object ApplicationOperation {
+
+ /**
+ * identifier determined by cluster manager for the engine
+ */
+ val APP_ID_KEY = "id"
+ val APP_NAME_KEY = "name"
+ val APP_STATE_KEY = "state"
+ val APP_URL_KEY = "url"
+ val APP_ERROR_KEY = "error"
+
+ val NOT_FOUND = "APPLICATION_NOT_FOUND"
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index ff5ea0063..a109d847e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -17,7 +17,6 @@
package org.apache.kyuubi.engine
-import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.util.Random
@@ -36,10 +35,8 @@ import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
import org.apache.kyuubi.engine.hive.HiveProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.engine.trino.TrinoProcessBuilder
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
-import org.apache.kyuubi.ha.client.DiscoveryClient
-import org.apache.kyuubi.ha.client.DiscoveryPaths
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_ENGINE_REF_ID,
HA_ZK_NAMESPACE}
+import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths}
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL,
ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.log.OperationLog
@@ -54,7 +51,8 @@ import org.apache.kyuubi.operation.log.OperationLog
private[kyuubi] class EngineRef(
conf: KyuubiConf,
user: String,
- engineRefId: String = UUID.randomUUID().toString)
+ engineRefId: String,
+ engineManager: KyuubiApplicationManager)
extends Logging {
// The corresponding ServerSpace where the engine belongs to
private val serverSpace: String = conf.get(HA_ZK_NAMESPACE)
@@ -167,24 +165,21 @@ private[kyuubi] class EngineRef(
val builder = engineType match {
case SPARK_SQL =>
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
- // tag is a seq type with comma-separated
- conf.set(
- SparkProcessBuilder.TAG_KEY,
- conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ +
",").getOrElse("") +
- "KYUUBI," + engineRefId)
new SparkProcessBuilder(appUser, conf, extraEngineLog)
case FLINK_SQL =>
conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
- // tag is a seq type with comma-separated
- conf.set(
- FlinkProcessBuilder.TAG_KEY,
- conf.getOption(FlinkProcessBuilder.TAG_KEY).map(_ +
",").getOrElse("") + "KYUUBI")
new FlinkProcessBuilder(appUser, conf, extraEngineLog)
case TRINO =>
new TrinoProcessBuilder(appUser, conf, extraEngineLog)
case HIVE_SQL =>
new HiveProcessBuilder(appUser, conf, extraEngineLog)
}
+ // TODO: Better to do this inside ProcBuilder
+ KyuubiApplicationManager.tagApplication(
+ engineRefId,
+ builder.shortName,
+ builder.clusterManager(),
+ builder.conf)
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
try {
@@ -204,10 +199,7 @@ private[kyuubi] class EngineRef(
}
}
if (started + timeout <= System.currentTimeMillis()) {
- val killMessage = engineType match {
- case SPARK_SQL => builder.killApplication(Left(engineRefId))
- case _ => builder.killApplication()
- }
+ val killMessage =
engineManager.killApplication(builder.clusterManager(), engineRefId)
process.destroyForcibly()
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT,
appUser)))
throw KyuubiSQLException(
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
new file mode 100644
index 000000000..40dd8ba74
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import java.nio.file.Paths
+
+import scala.sys.process._
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.ApplicationOperation.NOT_FOUND
+
+class JpsApplicationOperation extends ApplicationOperation {
+
+ private var runner: String = _
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ val jps = sys.env.get("JAVA_HOME").orElse(sys.props.get("java.home"))
+ .map(Paths.get(_, "bin", "jps").toString)
+ .getOrElse("jps")
+ runner =
+ try {
+ jps.!!
+ } catch {
+ case _: Throwable => null
+ }
+ }
+
+ override def isSupported(clusterManager: Option[String]): Boolean = {
+ runner != null && (clusterManager.isEmpty || clusterManager.get == "local")
+ }
+
+ private def getEngine(tag: String): Option[String] = {
+ if (runner == null) {
+ None
+ } else {
+ val pb = "jps -ml" #| s"grep $tag"
+ try {
+ pb.lineStream_!.headOption
+ } catch {
+ case _: Throwable => None
+ }
+ }
+ }
+
+ override def killApplicationByTag(tag: String): KillResponse = {
+ val commandOption = getEngine(tag)
+ if (commandOption.nonEmpty) {
+ val idAndCmd = commandOption.get
+ val (id, _) = idAndCmd.splitAt(idAndCmd.indexOf(" "))
+ try {
+ s"kill -15 $id".lineStream
+ (true, s"Succeeded to terminate: $idAndCmd")
+ } catch {
+ case e: Exception =>
+ (false, s"Failed to terminate: $idAndCmd, due to ${e.getMessage}")
+ }
+ } else {
+ (false, NOT_FOUND)
+ }
+ }
+
+ override def getApplicationInfoByTag(tag: String): Map[String, String] = {
+ val commandOption = getEngine(tag)
+ if (commandOption.nonEmpty) {
+ val idAndCmd = commandOption.get
+ val (id, cmd) = idAndCmd.splitAt(idAndCmd.indexOf(" "))
+ Map(
+ "id" -> id,
+ "name" -> cmd,
+ "state" -> "RUNNING")
+ } else {
+ Map("state" -> "FINISHED")
+ }
+ }
+
+ override def stop(): Unit = {}
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
new file mode 100644
index 000000000..4d5e488cb
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.AbstractService
+
+class KyuubiApplicationManager extends
AbstractService("KyuubiApplicationManager") {
+
+ // TODO: maybe add a configuration is better
+ private val operations = {
+ ServiceLoader.load(classOf[ApplicationOperation], getClass.getClassLoader)
+ .iterator().asScala.toSeq
+ }
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ operations.foreach { op =>
+ try {
+ op.initialize(conf)
+ } catch {
+ case NonFatal(e) => warn(s"Error starting
${op.getClass.getSimpleName}: ${e.getMessage}")
+ }
+ }
+ super.initialize(conf)
+ }
+
+ override def stop(): Unit = {
+ operations.foreach { op =>
+ try {
+ op.stop()
+ } catch {
+ case NonFatal(e) => warn(s"Error stopping
${op.getClass.getSimpleName}: ${e.getMessage}")
+ }
+ }
+ super.stop()
+ }
+
+ def killApplication(resourceManager: Option[String], tag: String):
KillResponse = {
+ var (killed, lastMessage): KillResponse = (false, null)
+ for (operation <- operations if !killed) {
+ if (operation.isSupported(resourceManager)) {
+ val (k, m) = operation.killApplicationByTag(tag)
+ killed = k
+ lastMessage = m
+ }
+ }
+
+ val finalMessage =
+ if (lastMessage == null) {
+ s"No ${classOf[ApplicationOperation]} Service found in ServiceLoader" +
+ s" for $resourceManager"
+ } else {
+ lastMessage
+ }
+ (killed, finalMessage)
+ }
+
+ def getApplicationInfo(
+ clusterManager: Option[String],
+ tag: String): Option[Map[String, String]] = {
+
operations.find(_.isSupported(clusterManager)).map(_.getApplicationInfoByTag(tag))
+ }
+}
+
+object KyuubiApplicationManager {
+ private def setupSparkYarnTag(tag: String, conf: KyuubiConf): Unit = {
+ val originalTag = conf.getOption("spark.yarn.tags").map(_ +
",").getOrElse("")
+ val newTag = s"${originalTag}KYUUBI,$tag"
+ conf.set("spark.yarn.tags", newTag)
+ }
+
+ private def setupSparkK8sTag(tag: String, conf: KyuubiConf): Unit = {
+ conf.set("spark.kubernetes.driver.label.kyuubi_unique_tag", tag)
+ }
+
+ private def setupFlinkK8sTag(tag: String, conf: KyuubiConf): Unit = {
+ // TODO: yarn.tags or flink.yarn.tags, the mess of flink settings confuses
me now.
+ val originalTag = conf.getOption("yarn.tags").map(_ + ",")
+ val newTag = s"${originalTag}KYUUBI,$tag"
+ conf.set("yarn.tags", newTag)
+ }
+
+ /**
+ * Add a unique tag on the application
+ * @param applicationTag a unique tag to identify application
+ * @param applicationType application short type, e.g. SPARK_SQL SPARK_BATCH
is SPARK
+ * @param resourceManager yarn, kubernetes(k8s) etc.
+ * @param conf kyuubi conf instance in session layer
+ */
+ def tagApplication(
+ applicationTag: String,
+ applicationType: String,
+ resourceManager: Option[String],
+ conf: KyuubiConf): Unit = {
+ (applicationType.toUpperCase, resourceManager.map(_.toUpperCase())) match {
+ case ("SPARK", Some("YARN")) => setupSparkYarnTag(applicationTag, conf)
+ case ("SPARK", Some("K8S")) => setupSparkK8sTag(applicationTag, conf)
+ case ("SPARK", _) =>
+ // if the master is not identified ahead, add all tags
+ setupSparkYarnTag(applicationTag, conf)
+ setupSparkK8sTag(applicationTag, conf)
+ case ("FLINK", _) =>
+ // running flink on other platforms is not yet supported
+ setupFlinkK8sTag(applicationTag, conf)
+ // other engine types are running locally yet
+ case _ =>
+ }
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 24b6f0365..7b7ff744d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -41,7 +41,7 @@ trait ProcBuilder {
* The short name of the engine process builder, we use this for form the
engine jar paths now
* see `mainResource`
*/
- protected def shortName: String
+ def shortName: String
/**
* executable, it is `JAVA_HOME/bin/java` by default
@@ -99,7 +99,7 @@ trait ProcBuilder {
protected def commands: Array[String]
- protected def conf: KyuubiConf
+ def conf: KyuubiConf
protected def env: Map[String, String] = conf.getEnvs
@@ -237,12 +237,6 @@ trait ProcBuilder {
process
}
- /**
- * Use Left to represent engineRefId and Right to represent line.
- */
- def killApplication(clue: Either[String, String] =
Right(lastRowsOfLog.toArray.mkString("\n")))
- : String = ""
-
def close(): Unit = synchronized {
if (logCaptureThread != null) {
logCaptureThread.interrupt()
@@ -332,6 +326,8 @@ trait ProcBuilder {
s"deployment/settings.html#environments")
}
+ def clusterManager(): Option[String] = None
+
}
object ProcBuilder extends Logging {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
new file mode 100644
index 000000000..9cbdd4452
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.yarn.client.api.YarnClient
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.ApplicationOperation._
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+
+class YarnApplicationOperation extends ApplicationOperation with Logging {
+
+ @volatile private var yarnClient: YarnClient = _
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ val yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
+ // YarnClient is thread-safe
+ val c = YarnClient.createYarnClient()
+ c.init(yarnConf)
+ c.start()
+ yarnClient = c
+ info(s"Successfully initialized yarn client: ${c.getServiceState}")
+ }
+
+ override def isSupported(clusterManager: Option[String]): Boolean = {
+ yarnClient != null && clusterManager.nonEmpty &&
"yarn".equalsIgnoreCase(clusterManager.get)
+ }
+
+ override def killApplicationByTag(tag: String): KillResponse = {
+ if (yarnClient != null) {
+ try {
+ val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
+ if (reports.isEmpty) {
+ (false, NOT_FOUND)
+ } else {
+ try {
+ val applicationId = reports.get(0).getApplicationId
+ yarnClient.killApplication(applicationId)
+ (true, s"Succeeded to terminate: $applicationId with $tag")
+ } catch {
+ case e: Exception =>
+ (false, s"Failed to terminate application with $tag, due to
${e.getMessage}")
+ }
+ }
+ } catch {
+ case e: Exception =>
+ (
+ false,
+ s"Failed to get while terminating application with tag $tag," +
+ s" due to ${e.getMessage}")
+ }
+ } else {
+ throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
+ }
+ }
+
+ override def getApplicationInfoByTag(tag: String): Map[String, String] = {
+ if (yarnClient != null) {
+ debug(s"Getting application info from Yarn cluster by $tag tag")
+ val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
+ if (reports.isEmpty) {
+ debug(s"Application with tag $tag not found")
+ null
+ } else {
+ val report = reports.get(0)
+ val res = Map(
+ APP_ID_KEY -> report.getApplicationId.toString,
+ APP_NAME_KEY -> report.getName,
+ APP_STATE_KEY -> report.getYarnApplicationState.toString,
+ APP_URL_KEY -> report.getTrackingUrl,
+ APP_ERROR_KEY -> report.getDiagnostics)
+ debug(s"Successfully got application info by $tag: " + res.mkString(",
"))
+ res
+ }
+ } else {
+ throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
+ }
+ }
+
+ override def stop(): Unit = {
+ if (yarnClient != null) {
+ try {
+ yarnClient.stop()
+ } catch {
+ case e: Exception => error(e.getMessage)
+ }
+ }
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index d63d073e9..978f9bdd7 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -18,11 +18,8 @@
package org.apache.kyuubi.engine.flink
import java.io.{File, FilenameFilter}
-import java.lang.ProcessBuilder.Redirect
import java.nio.file.Paths
-import scala.collection.JavaConverters._
-
import com.google.common.annotations.VisibleForTesting
import org.apache.kyuubi._
@@ -78,34 +75,6 @@ class FlinkProcessBuilder(
override protected def commands: Array[String] = Array(executable)
- override def killApplication(clue: Either[String, String]): String = clue
match {
- case Left(_) => ""
- case Right(line) => killApplicationByLog(line)
- }
-
- def killApplicationByLog(line: String =
lastRowsOfLog.toArray.mkString("\n")): String = {
- "Job ID: .*".r.findFirstIn(line) match {
- case Some(jobIdLine) =>
- val jobId = jobIdLine.split("Job ID: ")(1).trim
- env.get("FLINK_HOME") match {
- case Some(flinkHome) =>
- val pb = new ProcessBuilder("/bin/sh", s"$flinkHome/bin/flink",
"stop", jobId)
- pb.environment()
- .putAll(childProcEnv.asJava)
- pb.redirectError(Redirect.appendTo(engineLog))
- pb.redirectOutput(Redirect.appendTo(engineLog))
- val process = pb.start()
- process.waitFor() match {
- case id if id != 0 => s"Failed to kill Application $jobId,
please kill it manually. "
- case _ => s"Killed Application $jobId successfully. "
- }
- case None =>
- s"FLINK_HOME is not set! Failed to kill Application $jobId, please
kill it manually."
- }
- case None => ""
- }
- }
-
@VisibleForTesting
def FLINK_HOME: String = {
// prepare FLINK_HOME
@@ -137,7 +106,7 @@ class FlinkProcessBuilder(
}
}
- override protected def shortName: String = "flink"
+ override def shortName: String = "flink"
}
object FlinkProcessBuilder {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
index 8f8ad0c33..3aa70800c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
@@ -104,5 +104,5 @@ class HiveProcessBuilder(
override def toString: String = commands.mkString("\n")
- override protected def shortName: String = "hive"
+ override def shortName: String = "hive"
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/package.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/package.scala
new file mode 100644
index 000000000..e1d5160e1
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/package.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi
+
+package object engine {
+
+ /**
+ * (killed or not, hint message)
+ */
+ type KillResponse = (Boolean, String)
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 108f13f64..5a5c23392 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -17,15 +17,11 @@
package org.apache.kyuubi.engine.spark
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.server.api.v1.BatchRequest
-import org.apache.kyuubi.util.KyuubiHadoopUtils
class SparkBatchProcessBuilder(
override val proxyUser: String,
@@ -73,23 +69,7 @@ class SparkBatchProcessBuilder(
override protected def module: String = "kyuubi-spark-batch-submit"
- private[kyuubi] def getApplicationIdAndUrl(): Option[(String, String)] = {
-
batchRequest.conf.get(MASTER_KEY).orElse(getSparkDefaultsConf().get(MASTER_KEY))
match {
- case Some("yarn") =>
- val yarnClient = getYarnClient
- val yarnConf = new
YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
- yarnClient.init(yarnConf)
- yarnClient.start()
- try {
- val apps = yarnClient.getApplications(null, null,
Set(batchId).asJava)
- apps.asScala.headOption.map { applicationReport =>
- applicationReport.getApplicationId.toString ->
applicationReport.getTrackingUrl
- }
- } finally {
- yarnClient.stop()
- }
-
- case _ => None // TODO: Support other resource manager
- }
+ override def clusterManager(): Option[String] = {
+ batchRequest.conf.get(MASTER_KEY).orElse(defaultMaster)
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index e249846a8..0dc9b09d5 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -20,22 +20,16 @@ package org.apache.kyuubi.engine.spark
import java.io.{File, IOException}
import java.nio.file.Paths
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.records.ApplicationId
-import org.apache.hadoop.yarn.client.api.YarnClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_TYPE}
import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.util.KyuubiHadoopUtils
class SparkProcessBuilder(
override val proxyUser: String,
@@ -45,8 +39,6 @@ class SparkProcessBuilder(
import SparkProcessBuilder._
- def getYarnClient: YarnClient = YarnClient.createYarnClient
-
private val sparkHome = getEngineHome(shortName)
override protected val executable: String = {
@@ -139,49 +131,28 @@ class SparkProcessBuilder(
}
}
- override def killApplication(clue: Either[String, String]): String = clue
match {
- case Left(engineRefId) => killApplicationByTag(engineRefId)
- case Right(_) => ""
- }
+ override def shortName: String = "spark"
- private def killApplicationByTag(engineRefId: String): String = {
- conf.getOption(MASTER_KEY).orElse(getSparkDefaultsConf().get(MASTER_KEY))
match {
- case Some("yarn") =>
- var applicationId: ApplicationId = null
- val yarnClient = getYarnClient
- try {
- val yarnConf = new
YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
- yarnClient.init(yarnConf)
- yarnClient.start()
- val apps = yarnClient.getApplications(null, null,
Set(engineRefId).asJava)
- if (apps.isEmpty) return s"There are no Application tagged with
$engineRefId," +
- s" please kill it manually."
- applicationId = apps.asScala.head.getApplicationId
- yarnClient.killApplication(
- applicationId,
- s"Kyuubi killed this caused by:
Timeout(${conf.get(ENGINE_INIT_TIMEOUT)} ms) to" +
- s" launched ${conf.get(ENGINE_TYPE)} engine with $this.")
- s"Killed Application $applicationId tagged with $engineRefId
successfully."
- } catch {
- case e: Throwable =>
- s"Failed to kill Application $applicationId tagged with
$engineRefId," +
- s" please kill it manually. Caused by ${e.getMessage}."
- } finally {
- yarnClient.stop()
+ protected lazy val defaultMaster: Option[String] = {
+ val confDir = env.getOrElse(SPARK_CONF_DIR,
s"$sparkHome${File.separator}conf")
+ val defaults =
+ try {
+ val confFile = new
File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME")
+ if (confFile.exists()) {
+ Utils.getPropertiesFromFile(Some(confFile))
+ } else {
+ Map.empty[String, String]
}
- case _ => "Kill Application only works with YARN, please kill it
manually." +
- s" Application tagged with $engineRefId"
- }
+ } catch {
+ case _: Exception =>
+ warn(s"Failed to load spark configurations from $confDir")
+ Map.empty[String, String]
+ }
+ defaults.get(MASTER_KEY)
}
- override protected def shortName: String = "spark"
-
- protected def getSparkDefaultsConf(): Map[String, String] = {
- val sparkDefaultsConfFile = env.get(SPARK_CONF_DIR)
- .orElse(Option(s"$sparkHome${File.separator}conf"))
- .map(_ + File.separator + SPARK_CONF_FILE_NAME)
- .map(new File(_)).filter(_.exists())
- Utils.getPropertiesFromFile(sparkDefaultsConfFile)
+ override def clusterManager(): Option[String] = {
+ conf.getOption(MASTER_KEY).orElse(defaultMaster)
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala
index 70cecb8f5..de3cbc5ca 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala
@@ -96,7 +96,7 @@ class TrinoProcessBuilder(
buffer.toArray
}
- override protected def shortName: String = "trino"
+ override def shortName: String = "trino"
override def toString: String = commands.mkString("\n")
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index d8e901187..9dc2c7a80 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -26,12 +26,12 @@ import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.ProcBuilder
+import org.apache.kyuubi.engine.{ApplicationOperation, ProcBuilder}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
-import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT,
FETCH_PRIOR, FetchOrientation}
+import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.server.api.v1.BatchRequest
-import org.apache.kyuubi.session.KyuubiBatchSessionImpl
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
import org.apache.kyuubi.util.ThriftUtils
class BatchJobSubmission(session: KyuubiBatchSessionImpl, batchRequest:
BatchRequest)
@@ -43,12 +43,16 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl,
batchRequest: BatchReq
private lazy val _operationLog = OperationLog.createOperationLog(session,
getHandle)
+ private val applicationManager =
+
session.sessionManager.asInstanceOf[KyuubiSessionManager].applicationManager
+
private var builder: ProcBuilder = _
- @volatile
- private[kyuubi] var appIdAndUrl: Option[(String, String)] = None
+ private val batchId: String = session.handle.identifier.toString
- private var resultFetched: Boolean = _
+ private[kyuubi] def currentApplicationState: Option[Map[String, String]] = {
+ applicationManager.getApplicationInfo(builder.clusterManager(), batchId)
+ }
private val applicationCheckInterval =
session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL)
@@ -57,7 +61,7 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl,
batchRequest: BatchReq
override protected def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(_operationLog)
- setHasResultSet(false)
+ setHasResultSet(true)
setState(OperationState.PENDING)
}
@@ -86,7 +90,7 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl,
batchRequest: BatchReq
new SparkBatchProcessBuilder(
session.user,
session.sessionConf,
- session.batchId,
+ batchId,
batchRequest.copy(conf = batchSparkConf ++ batchRequest.conf),
getOperationLog)
@@ -97,34 +101,29 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl,
batchRequest: BatchReq
try {
info(s"Submitting ${batchRequest.batchType} batch job: $builder")
val process = builder.start
- while (appIdAndUrl.isEmpty) {
- try {
- builder match {
- case sparkBatchProcessBuilder: SparkBatchProcessBuilder =>
- sparkBatchProcessBuilder.getApplicationIdAndUrl() match {
- case Some(appInfo) => appIdAndUrl = Some(appInfo)
- case _ =>
- }
-
- case _ =>
- }
- } catch {
- case e: Exception => error(s"Failed to check batch application", e)
- }
+ var applicationStatus = currentApplicationState
+ while (applicationStatus.isEmpty) {
+ applicationStatus = currentApplicationState
Thread.sleep(applicationCheckInterval)
}
- process.waitFor()
- if (process.exitValue() != 0) {
- throw new KyuubiException(s"Process exit with value
${process.exitValue()}")
+ val state = applicationStatus.get(ApplicationOperation.APP_STATE_KEY)
+ if (state == "KILLED" || state == "FAILED") {
+ process.destroyForcibly()
+ throw new RuntimeException("Batch job failed:" +
applicationStatus.get.mkString(","))
+ } else {
+ process.waitFor()
+ if (process.exitValue() != 0) {
+ throw new KyuubiException(s"Process exit with value
${process.exitValue()}")
+ }
}
} finally {
builder.close()
}
}
- override def getResultSetSchema: TTableSchema = {
+ override val getResultSetSchema: TTableSchema = {
val schema = new TTableSchema()
- Seq("ApplicationId", "URL").zipWithIndex.foreach { case (colName,
position) =>
+ Seq("key", "value").zipWithIndex.foreach { case (colName, position) =>
val tColumnDesc = new TColumnDesc()
tColumnDesc.setColumnName(colName)
val tTypeDesc = new TTypeDesc()
@@ -137,40 +136,14 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl,
batchRequest: BatchReq
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
- validateDefaultFetchOrientation(order)
- assertState(OperationState.FINISHED)
- setHasResultSet(true)
- order match {
- case FETCH_NEXT => fetchNext()
- case FETCH_PRIOR => resultSet
- case FETCH_FIRST => resultSet
- }
- }
-
- private lazy val resultSet: TRowSet = {
- val tRow = new TRowSet(0, new JArrayList[TRow](1))
- val (appId, url) = appIdAndUrl.toSeq.unzip
-
- val tAppIdColumn = TColumn.stringVal(new TStringColumn(
- appId.asJava,
- ByteBuffer.allocate(0)))
-
- val tUrlColumn = TColumn.stringVal(new TStringColumn(
- url.asJava,
- ByteBuffer.allocate(0)))
-
- tRow.addToColumns(tAppIdColumn)
- tRow.addToColumns(tUrlColumn)
- tRow
- }
-
- private def fetchNext(): TRowSet = {
- if (!resultFetched) {
- resultFetched = true
- resultSet
- } else {
- ThriftUtils.EMPTY_ROW_SET
- }
+ currentApplicationState.map { state =>
+ val tRow = new TRowSet(0, new JArrayList[TRow](state.size))
+ Seq(state.keys, state.values).map(_.toSeq.asJava).foreach { col =>
+ val tCol = TColumn.stringVal(new TStringColumn(col,
ByteBuffer.allocate(0)))
+ tRow.addToColumns(tCol)
+ }
+ tRow
+ }.getOrElse(ThriftUtils.EMPTY_ROW_SET)
}
override def close(): Unit = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 8352cd031..7926e68c0 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -38,7 +38,6 @@ class KyuubiBatchSessionImpl(
batchRequest: BatchRequest)
extends KyuubiSession(protocol, user, password, ipAddress, conf,
sessionManager) {
override val handle: SessionHandle =
sessionManager.newBatchSessionHandle(protocol)
- val batchId: String = handle.identifier.toString
// TODO: Support batch conf advisor
override val normalizedConf: Map[String, String] =
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index d0b4b865e..da7e6a61e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -64,7 +64,8 @@ class KyuubiSessionImpl(
case (key, value) => sessionConf.set(key, value)
}
- val engine: EngineRef = new EngineRef(sessionConf, user)
+ val engine: EngineRef =
+ new EngineRef(sessionConf, user, handle.identifier.toString,
sessionManager.applicationManager)
private[kyuubi] val launchEngineOp = sessionManager.operationManager
.newLaunchEngineOperation(this,
sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 97ba4fddf..851492cf1 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -27,6 +27,7 @@ import org.apache.kyuubi.cli.HandleIdentifier
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.credentials.HadoopCredentialsManager
+import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.KyuubiOperationManager
@@ -42,10 +43,12 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
val credentialsManager = new HadoopCredentialsManager()
// this lazy is must be specified since the conf is null when the class
initialization
lazy val sessionConfAdvisor: SessionConfAdvisor =
PluginLoader.loadSessionConfAdvisor(conf)
+ val applicationManager = new KyuubiApplicationManager()
private var limiter: Option[SessionLimiter] = None
override def initialize(conf: KyuubiConf): Unit = {
+ addService(applicationManager)
addService(credentialsManager)
val absPath =
Utils.getAbsolutePathFromWork(conf.get(SERVER_OPERATION_LOG_DIR_ROOT))
_operationLogRoot = Some(absPath.toAbsolutePath.toString)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 633e8e8f0..d10443047 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
+import org.apache.kyuubi.engine.YarnApplicationOperation
import org.apache.kyuubi.server.MiniYarnService
/**
@@ -27,28 +28,35 @@ import org.apache.kyuubi.server.MiniYarnService
* may be thrown `/bin/bash: /bin/java: No such file or directory`.
*/
trait WithKyuubiServerOnYarn extends WithKyuubiServer {
- protected val kyuubiServerConf: KyuubiConf
- protected val connectionConf: Map[String, String]
- private var miniYarnService: MiniYarnService = _
+ override protected val conf: KyuubiConf = new KyuubiConf()
- final override protected lazy val conf: KyuubiConf = {
- connectionConf.foreach { case (k, v) => kyuubiServerConf.set(k, v) }
- kyuubiServerConf
+ protected lazy val yarnOperation: YarnApplicationOperation = {
+ val operation = new YarnApplicationOperation()
+ operation.initialize(miniYarnService.getConf)
+ operation
}
+ protected var miniYarnService: MiniYarnService = _
+
override def beforeAll(): Unit = {
+ conf.set("spark.master", "yarn")
+ .set("spark.executor.instances", "1")
miniYarnService = new MiniYarnService()
- miniYarnService.initialize(new KyuubiConf(false))
+ miniYarnService.initialize(conf)
miniYarnService.start()
conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR",
miniYarnService.getHadoopConfDir)
super.beforeAll()
}
override def afterAll(): Unit = {
+ // stop kyuubi server
+ // stop yarn operation client
+ // stop yarn cluster
+ super.afterAll()
+ yarnOperation.stop()
if (miniYarnService != null) {
miniYarnService.stop()
miniYarnService = null
}
- super.afterAll()
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
index 8b82db990..548e4f13d 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
@@ -65,7 +65,7 @@ class EngineRefSuite extends KyuubiFunSuite {
Seq(None, Some("suffix")).foreach { domain =>
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, _))
- val engine = new EngineRef(conf, user, id)
+ val engine = new EngineRef(conf, user, id, null)
assert(engine.engineSpace ===
DiscoveryPaths.makePath(
s"kyuubi_${KYUUBI_VERSION}_${CONNECTION}_${engineType}",
@@ -79,7 +79,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
- val appName = new EngineRef(conf, user, id)
+ val appName = new EngineRef(conf, user, id, null)
assert(appName.engineSpace ===
DiscoveryPaths.makePath(
s"kyuubi_${KYUUBI_VERSION}_${USER}_$FLINK_SQL",
@@ -91,7 +91,7 @@ class EngineRefSuite extends KyuubiFunSuite {
k =>
conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.set(k.key, "abc")
- val appName2 = new EngineRef(conf, user, id)
+ val appName2 = new EngineRef(conf, user, id, null)
assert(appName2.engineSpace ===
DiscoveryPaths.makePath(
s"kyuubi_${KYUUBI_VERSION}_${USER}_${FLINK_SQL}",
@@ -105,7 +105,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, GROUP.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
- val engineRef = new EngineRef(conf, user, id)
+ val engineRef = new EngineRef(conf, user, id, null)
val primaryGroupName =
UserGroupInformation.createRemoteUser(user).getPrimaryGroupName
assert(engineRef.engineSpace ===
DiscoveryPaths.makePath(
@@ -119,7 +119,7 @@ class EngineRefSuite extends KyuubiFunSuite {
k =>
conf.unset(k)
conf.set(k.key, "abc")
- val engineRef2 = new EngineRef(conf, user, id)
+ val engineRef2 = new EngineRef(conf, user, id, null)
assert(engineRef2.engineSpace ===
DiscoveryPaths.makePath(
s"kyuubi_${KYUUBI_VERSION}_${GROUP}_${SPARK_SQL}",
@@ -132,7 +132,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val userName = "Iamauserwithoutgroup"
val newUGI = UserGroupInformation.createRemoteUser(userName)
assert(newUGI.getGroupNames.isEmpty)
- val engineRef3 = new EngineRef(conf, userName, id)
+ val engineRef3 = new EngineRef(conf, userName, id, null)
assert(engineRef3.engineSpace ===
DiscoveryPaths.makePath(
s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL",
@@ -145,7 +145,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, SERVER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
- val appName = new EngineRef(conf, user, id)
+ val appName = new EngineRef(conf, user, id, null)
assert(appName.engineSpace ===
DiscoveryPaths.makePath(
s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}",
@@ -154,7 +154,7 @@ class EngineRefSuite extends KyuubiFunSuite {
assert(appName.defaultEngineName ===
s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_default_$id")
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
- val appName2 = new EngineRef(conf, user, id)
+ val appName2 = new EngineRef(conf, user, id, null)
assert(appName2.engineSpace ===
DiscoveryPaths.makePath(
s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}",
@@ -169,31 +169,31 @@ class EngineRefSuite extends KyuubiFunSuite {
// set subdomain and disable engine pool
conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
conf.set(ENGINE_POOL_SIZE, -1)
- val engine1 = new EngineRef(conf, user, id)
+ val engine1 = new EngineRef(conf, user, id, null)
assert(engine1.subdomain === "abc")
// unset subdomain and disable engine pool
conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.set(ENGINE_POOL_SIZE, -1)
- val engine2 = new EngineRef(conf, user, id)
+ val engine2 = new EngineRef(conf, user, id, null)
assert(engine2.subdomain === "default")
// set subdomain and 1 <= engine pool size < threshold
conf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
conf.set(ENGINE_POOL_SIZE, 1)
- val engine3 = new EngineRef(conf, user, id)
+ val engine3 = new EngineRef(conf, user, id, null)
assert(engine3.subdomain === "abc")
// unset subdomain and 1 <= engine pool size < threshold
conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.set(ENGINE_POOL_SIZE, 3)
- val engine4 = new EngineRef(conf, user, id)
+ val engine4 = new EngineRef(conf, user, id, null)
assert(engine4.subdomain.startsWith("engine-pool-"))
// unset subdomain and engine pool size > threshold
conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.set(ENGINE_POOL_SIZE, 100)
- val engine5 = new EngineRef(conf, user, id)
+ val engine5 = new EngineRef(conf, user, id, null)
val engineNumber = Integer.parseInt(engine5.subdomain.substring(12))
val threshold = ENGINE_POOL_SIZE_THRESHOLD.defaultVal.get
assert(engineNumber <= threshold)
@@ -203,7 +203,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val enginePoolName = "test-pool"
conf.set(ENGINE_POOL_NAME, enginePoolName)
conf.set(ENGINE_POOL_SIZE, 3)
- val engine6 = new EngineRef(conf, user, id)
+ val engine6 = new EngineRef(conf, user, id, null)
assert(engine6.subdomain.startsWith(s"$enginePoolName-"))
}
@@ -214,7 +214,7 @@ class EngineRefSuite extends KyuubiFunSuite {
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test")
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
- val engine = new EngineRef(conf, user, id)
+ val engine = new EngineRef(conf, user, id, null)
var port1 = 0
var port2 = 0
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
new file mode 100644
index 000000000..414fb35ec
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine
+
+import java.lang.management.ManagementFactory
+import java.time.Duration
+import java.util.{ServiceLoader, UUID}
+
+import scala.collection.JavaConverters._
+import scala.sys.process._
+
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.SESSION_IDLE_TIMEOUT
+import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+
+class JpsApplicationOperationSuite extends KyuubiFunSuite {
+ private val operations = ServiceLoader.load(classOf[ApplicationOperation])
+
.asScala.filter(_.getClass.isAssignableFrom(classOf[JpsApplicationOperation]))
+ private val jps = operations.head
+ jps.initialize(null)
+
+ test("JpsApplicationOperation with jstat") {
+ assert(jps.isSupported(None))
+ assert(jps.isSupported(Some("local")))
+ assert(!jps.killApplicationByTag(null)._1)
+ assert(!jps.killApplicationByTag("have a space")._1)
+ val currentProcess = ManagementFactory.getRuntimeMXBean.getName
+ val currentPid = currentProcess.splitAt(currentProcess.indexOf("@"))._1
+
+ new Thread {
+ override def run(): Unit = {
+ s"jstat -gcutil $currentPid 1000".!
+ }
+ }.start()
+
+ val desc1 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
+ assert(desc1.contains("id"))
+ assert(desc1.contains("name"))
+ assert(desc1("state") === "RUNNING")
+
+ jps.killApplicationByTag("sun.tools.jstat.Jstat")
+
+ val desc2 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
+ assert(!desc2.contains("id"))
+ assert(!desc2.contains("name"))
+ assert(desc2("state") === "FINISHED")
+ }
+
+ test("JpsApplicationOperation with spark local mode") {
+ val user = Utils.currentUser
+ val id = UUID.randomUUID().toString
+ val conf = new KyuubiConf()
+ .set("spark.abc", id)
+ .set("spark.master", "local")
+ .set(SESSION_IDLE_TIMEOUT, Duration.ofMinutes(3).toMillis)
+ val builder = new SparkProcessBuilder(user, conf)
+ builder.start
+
+ assert(jps.isSupported(builder.clusterManager()))
+ eventually(Timeout(10.seconds)) {
+ val desc1 = jps.getApplicationInfoByTag(id)
+ assert(desc1.contains("id"))
+ assert(desc1("name").contains(id))
+ assert(desc1("state") === "RUNNING")
+ }
+
+ val response = jps.killApplicationByTag(id)
+ assert(response._1, response._2)
+ assert(response._2 startsWith "Succeeded to terminate:")
+
+ val desc2 = jps.getApplicationInfoByTag(id)
+ assert(!desc2.contains("id"))
+ assert(!desc2.contains("name"))
+ assert(desc2("state") === "FINISHED")
+ val response2 = jps.killApplicationByTag(id)
+ assert(!response2._1)
+ assert(response2._2 === ApplicationOperation.NOT_FOUND)
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 28ce47ba8..94c5cb450 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -28,25 +28,4 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
val commands = builder.toString.split(' ')
assert(commands.exists(_ endsWith "flink-sql-engine.sh"))
}
-
- test("kill application") {
- val processBuilder = new FakeFlinkProcessBuilder(conf) {
- override protected def env: Map[String, String] = Map("FLINK_HOME" -> "")
- }
- val exit1 = processBuilder.killApplication(
- Right("""
- |[INFO] SQL update statement has been successfully submitted to
the cluster:
- |Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
- |""".stripMargin))
- assert(exit1.contains("6b1af540c0c0bb3fcfcad50ac037c862")
- && !exit1.contains("FLINK_HOME is not set!"))
-
- val exit2 = processBuilder.killApplication(Right("unknow"))
- assert(exit2.equals(""))
- }
-}
-
-class FakeFlinkProcessBuilder(config: KyuubiConf)
- extends FlinkProcessBuilder("fake", config) {
- override protected def commands: Array[String] = Array("ls")
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
deleted file mode 100644
index 2dd8a0e5e..000000000
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark
-
-import java.util.UUID
-
-import scala.concurrent.duration._
-
-import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.server.MiniYarnService
-import org.apache.kyuubi.server.api.v1.BatchRequest
-
-class SparkBatchProcessBuilderSuite extends KyuubiFunSuite {
- private val conf = KyuubiConf().set("kyuubi.on", "off")
- private var miniYarnService: MiniYarnService = _
-
- override def beforeAll(): Unit = {
- miniYarnService = new MiniYarnService()
- miniYarnService.initialize(new KyuubiConf(false))
- miniYarnService.start()
- conf.set(
- s"${KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX}.HADOOP_CONF_DIR",
- miniYarnService.getHadoopConfDir)
- super.beforeAll()
- }
-
- override def afterAll(): Unit = {
- if (miniYarnService != null) {
- miniYarnService.stop()
- miniYarnService = null
- }
- super.afterAll()
- }
-
- test("spark batch process builder") {
- val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
-
- val batchRequest = BatchRequest(
- "spark",
- sparkProcessBuilder.mainResource.get,
- "kyuubi",
- sparkProcessBuilder.mainClass,
- "spark-batch-submission",
- Map("spark.master" -> "yarn"),
- Seq.empty[String])
-
- val builder = new SparkBatchProcessBuilder(
- batchRequest.proxyUser,
- conf,
- UUID.randomUUID().toString,
- batchRequest)
- val proc = builder.start
-
- eventually(timeout(3.minutes), interval(500.milliseconds)) {
- val applicationIdAndUrl = builder.getApplicationIdAndUrl()
- assert(applicationIdAndUrl.isDefined)
- assert(applicationIdAndUrl.exists(_._1.startsWith("application_")))
- assert(applicationIdAndUrl.exists(_._2.nonEmpty))
- }
- proc.destroyForcibly()
- }
-}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala
deleted file mode 100644
index 245a10719..000000000
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderOnYarnSuite.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark
-
-import java.util.UUID
-
-import scala.concurrent.duration.DurationInt
-
-import org.apache.hadoop.yarn.client.api.YarnClient
-import org.scalatestplus.mockito.MockitoSugar.mock
-
-import org.apache.kyuubi.{Utils, WithKyuubiServerOnYarn}
-import org.apache.kyuubi.config.KyuubiConf
-
-class SparkProcessBuilderOnYarnSuite extends WithKyuubiServerOnYarn {
-
- override protected val kyuubiServerConf: KyuubiConf = KyuubiConf()
-
- override protected val connectionConf: Map[String, String] = Map(
- "spark.master" -> "yarn",
- "spark.executor.instances" -> "1")
-
- test("test kill application") {
- val engineRefId = UUID.randomUUID().toString
-
- conf.set(
- SparkProcessBuilder.TAG_KEY,
- conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") +
- "KYUUBI," + engineRefId)
- val builder = new SparkProcessBuilder(Utils.currentUser, conf)
- val proc = builder.start
- eventually(timeout(3.minutes), interval(1.seconds)) {
- val killMsg = builder.killApplication(Left(engineRefId))
- assert(killMsg.contains(s"tagged with $engineRefId successfully."))
- }
- proc.destroyForcibly()
-
- val pb1 = new FakeSparkProcessBuilder(conf.clone) {
- override protected def env: Map[String, String] = Map()
- override def getYarnClient: YarnClient = mock[YarnClient]
- }
- val exit1 = pb1.killApplication(Left(engineRefId))
- assert(exit1.equals(s"There are no Application tagged with $engineRefId," +
- s" please kill it manually."))
-
- val pb2 = new FakeSparkProcessBuilder(conf.clone) {
- override protected def env: Map[String, String] = Map()
- override def getYarnClient: YarnClient = mock[YarnClient]
- }
- pb2.conf.set("spark.master", "local")
- val exit2 = pb2.killApplication(Left(engineRefId))
- assert(exit2.equals("Kill Application only works with YARN, please kill it
manually." +
- s" Application tagged with $engineRefId"))
- }
-
-}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
index 28f434faf..b1748a2b9 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
@@ -26,14 +26,14 @@ import org.apache.kyuubi.WithKyuubiServerOnYarn
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.operation.OperationState.ERROR
import org.apache.kyuubi.server.api.v1.BatchRequest
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
- override protected val connectionConf: Map[String, String] = Map.empty
- override protected val kyuubiServerConf: KyuubiConf = {
- KyuubiConf().set(s"$KYUUBI_BATCH_CONF_PREFIX.spark.spark.master", "yarn")
+ override protected val conf: KyuubiConf = {
+ new KyuubiConf().set(s"$KYUUBI_BATCH_CONF_PREFIX.spark.spark.master",
"yarn")
.set(BATCH_CONF_IGNORE_LIST, Seq("spark.master"))
}
@@ -55,7 +55,7 @@ class KyuubiBatchYarnClusterSuite extends
WithKyuubiServerOnYarn {
s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
Seq.empty[String])
- val sessionHandle = sessionManager.openBatchSession(
+ val sessionHandle = sessionManager().openBatchSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
batchRequest.proxyUser,
"passwd",
@@ -67,20 +67,39 @@ class KyuubiBatchYarnClusterSuite extends
WithKyuubiServerOnYarn {
val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
val batchJobSubmissionOp = session.batchJobSubmissionOp
- eventually(timeout(3.minutes), interval(500.milliseconds)) {
- val applicationIdAndUrl = batchJobSubmissionOp.appIdAndUrl
- assert(applicationIdAndUrl.isDefined)
- assert(applicationIdAndUrl.exists(_._1.startsWith("application_")))
- assert(applicationIdAndUrl.exists(_._2.nonEmpty))
-
- assert(batchJobSubmissionOp.getStatus.state === OperationState.FINISHED)
- val resultColumns =
batchJobSubmissionOp.getNextRowSet(FetchOrientation.FETCH_NEXT, 1)
- .getColumns.asScala
- val appId =
resultColumns.apply(0).getStringVal.getValues.asScala.apply(0)
- val url = resultColumns.apply(1).getStringVal.getValues.asScala.apply(0)
- assert(appId === batchJobSubmissionOp.appIdAndUrl.get._1)
- assert(url === batchJobSubmissionOp.appIdAndUrl.get._2)
+ eventually(timeout(3.minutes), interval(50.milliseconds)) {
+ val state = batchJobSubmissionOp.currentApplicationState
+ assert(state.nonEmpty)
+ assert(state.exists(_("id").startsWith("application_")))
}
+
+ val killResponse =
yarnOperation.killApplicationByTag(sessionHandle.identifier.toString)
+ assert(killResponse._1)
+ assert(killResponse._2 startsWith "Succeeded to terminate:")
+
+ val appInfo =
yarnOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
+
+ assert(appInfo("state") === "KILLED")
+ assert(batchJobSubmissionOp.getStatus.state === ERROR)
+
+ val resultColumns =
batchJobSubmissionOp.getNextRowSet(FetchOrientation.FETCH_NEXT, 10)
+ .getColumns.asScala
+
+ val keys = resultColumns.head.getStringVal.getValues.asScala
+ val values = resultColumns.apply(1).getStringVal.getValues.asScala
+ val rows = keys.zip(values).toMap
+ val appId = rows("id")
+ val appName = rows("name")
+ val appState = rows("state")
+ val appUrl = rows("url")
+ val appError = rows("error")
+
+ val state2 = batchJobSubmissionOp.currentApplicationState.get
+ assert(appId === state2("id"))
+ assert(appName === state2("name"))
+ assert(appState === state2("state"))
+ assert(appUrl === state2("url"))
+ assert(appError === state2("error"))
sessionManager.closeSession(sessionHandle)
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala
index 7c2c32837..d708c2fd0 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala
@@ -18,22 +18,11 @@
package org.apache.kyuubi.operation
import org.apache.kyuubi.WithKyuubiServerOnYarn
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_INIT_TIMEOUT
class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with
SparkQueryTests {
override protected def jdbcUrl: String = getJdbcUrl
- override protected val kyuubiServerConf: KyuubiConf = {
- // TODO KYUUBI #745
- KyuubiConf().set(ENGINE_INIT_TIMEOUT, 600000L)
- }
-
- override protected val connectionConf: Map[String, String] = Map(
- "spark.master" -> "yarn",
- "spark.executor.instances" -> "1")
-
test("KYUUBI #527- Support test with mini yarn cluster") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("""SELECT "${spark.app.id}" as
id""")
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
index 356c135a7..1a73cc24c 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.server
import java.io.{File, FileWriter}
-import java.net.{InetAddress, URLClassLoader}
+import java.net.InetAddress
import scala.collection.JavaConverters._
@@ -26,23 +26,15 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.AbstractService
-class MiniYarnService(name: String) extends AbstractService(name) {
- def this() = this(classOf[MiniYarnService].getSimpleName)
+class MiniYarnService extends AbstractService("TestMiniYarnService") {
- private var hadoopConfDir: File = _
- private var yarnConf: YarnConfiguration = _
- private var yarnCluster: MiniYARNCluster = _
-
- private val classLoader = Thread.currentThread().getContextClassLoader
-
- private def newYarnConfig(): YarnConfiguration = {
+ private val hadoopConfDir: File = Utils.createTempDir().toFile
+ private val yarnConf: YarnConfiguration = {
val yarnConfig = new YarnConfiguration()
// Disable the disk utilization check to avoid the test hanging when
people's disks are
// getting full.
@@ -77,41 +69,32 @@ class MiniYarnService(name: String) extends
AbstractService(name) {
yarnConfig.set(s"hadoop.proxyuser.$currentUser.hosts", "*")
yarnConfig
}
+ private val yarnCluster: MiniYARNCluster = new MiniYARNCluster(getName, 1,
1, 1)
override def initialize(conf: KyuubiConf): Unit = {
- hadoopConfDir = Utils.createTempDir().toFile
- yarnConf = newYarnConfig()
- yarnCluster = new MiniYARNCluster(name, 1, 1, 1)
yarnCluster.init(yarnConf)
super.initialize(conf)
}
override def start(): Unit = {
yarnCluster.start()
- val config = yarnCluster.getConfig
- eventually(timeout(10.seconds), interval(100.milliseconds)) {
- config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) != "0"
- }
- info(s"RM address in configuration is
${config.get(YarnConfiguration.RM_ADDRESS)}")
saveHadoopConf()
super.start()
-
- val hadoopConfClassLoader = new
URLClassLoader(Array(hadoopConfDir.toURI.toURL), classLoader)
- Thread.currentThread().setContextClassLoader(hadoopConfClassLoader)
}
override def stop(): Unit = {
if (yarnCluster != null) yarnCluster.stop()
- if (hadoopConfDir != null) hadoopConfDir.delete()
super.stop()
- Thread.currentThread().setContextClassLoader(classLoader)
}
private def saveHadoopConf(): Unit = {
val configToWrite = new Configuration(false)
val hostName = InetAddress.getLocalHost.getHostName
yarnCluster.getConfig.iterator().asScala.foreach { kv =>
- configToWrite.set(kv.getKey, kv.getValue.replaceAll(hostName,
"localhost"))
+ val key = kv.getKey
+ val value = kv.getValue.replaceAll(hostName, "localhost")
+ configToWrite.set(key, value)
+ getConf.set(key, value)
}
val writer = new FileWriter(new File(hadoopConfDir, "yarn-site.xml"))
configToWrite.writeXml(writer)