This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new be0b9c1 [KYUUBI #1169] Decouple EngineLoggingService From
SparkSQLEngine
be0b9c1 is described below
commit be0b9c1e69b4589fff6dedfa2fb68f97476371d7
Author: Kent Yao <[email protected]>
AuthorDate: Tue Sep 28 23:00:16 2021 +0800
[KYUUBI #1169] Decouple EngineLoggingService From SparkSQLEngine
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
1. Make EngineLoggingService tolerant for creating if any error occurs, and
then the main code goes.
2. Separate EngineLoggingService From SparkSQLEngine, and we will try to
start EngineLoggingService before SparkSQLEngine to capture all the lifecycle
statuses. On the other hand, add a shutdown hook that executed after
SparkSQLEngine.stop for EngineLoggingService.stop
3. try-catch for every step of SparkSQLEngine bootstrapping for creating
better diagnosis messages.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1171 from yaooqinn/1169.
Closes #1169
3a661555 [Kent Yao] ci
733bd1ed [Kent Yao] address comments
36ba1661 [Kent Yao] [KYUUBI #1169] Decouple EngineLoggingService From
SparkSQLEngine
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../kyuubi/engine/spark/SparkSQLEngine.scala | 87 ++++++++++++----------
.../engine/spark/events/EventLoggingService.scala | 28 +++----
.../spark/events/EventLoggingServiceSuite.scala | 2 +-
3 files changed, 62 insertions(+), 55 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index de160d0..d9941ba 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -20,12 +20,14 @@ package org.apache.kyuubi.engine.spark
import java.time.Instant
import java.util.concurrent.CountDownLatch
+import scala.util.control.NonFatal
+
import org.apache.spark.SparkConf
import org.apache.spark.kyuubi.SparkSQLEngineListener
import org.apache.spark.kyuubi.ui.EngineTab
import org.apache.spark.sql.SparkSession
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.Utils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
@@ -33,23 +35,18 @@ import
org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore,
EventLoggingService}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.RetryPolicies
-import org.apache.kyuubi.service.{Serverable, ServiceState}
+import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister
case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngine") {
- lazy val engineStatus: EngineEvent = EngineEvent(this)
-
- private val eventLogging = new EventLoggingService(this)
override val backendService = new SparkSQLBackendService(spark)
override val frontendServices = Seq(new
SparkThriftBinaryFrontendService(this))
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this, new
EngineEventsStore(conf))
spark.sparkContext.addSparkListener(listener)
- addService(eventLogging)
super.initialize(conf)
- eventLogging.onEvent(engineStatus.copy(state =
ServiceState.INITIALIZED.id))
}
override def start(): Unit = {
@@ -57,24 +54,11 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
// Start engine self-terminating checker after all services are ready and
it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker()
- eventLogging.onEvent(engineStatus.copy(state = ServiceState.STARTED.id))
- }
-
- override def stop(): Unit = {
- if (!state.equals(ServiceState.LATENT)) {
- eventLogging.onEvent(
- engineStatus.copy(state = ServiceState.STOPPED.id, endTime =
System.currentTimeMillis()))
- }
- super.stop()
}
override protected def stopServer(): Unit = {
countDownLatch.countDown()
}
-
- def engineId: String = {
-
spark.sparkContext.applicationAttemptId.getOrElse(spark.sparkContext.applicationId)
- }
}
object SparkSQLEngine extends Logging {
@@ -127,12 +111,37 @@ object SparkSQLEngine extends Logging {
def startEngine(spark: SparkSession): Unit = {
currentEngine = Some(new SparkSQLEngine(spark))
currentEngine.foreach { engine =>
- engine.initialize(kyuubiConf)
- engine.start()
+ // start event logging ahead so that we can capture all statuses
+ val eventLogging = new EventLoggingService(spark.sparkContext)
+ try {
+ eventLogging.initialize(kyuubiConf)
+ eventLogging.start()
+ } catch {
+ case NonFatal(e) =>
+ // Don't block the main process if the `EventLoggingService` failed
to start
+ warn(s"Failed to initialize EventLoggingService: ${e.getMessage}", e)
+ }
+
+ try {
+ engine.initialize(kyuubiConf)
+ EventLoggingService.onEvent(EngineEvent(engine))
+ } catch {
+ case t: Throwable =>
+ throw new KyuubiException(s"Failed to initialize SparkSQLEngine:
${t.getMessage}", t)
+ }
+ try {
+ engine.start()
+ EngineTab(engine)
+ val event = EngineEvent(engine)
+ info(event)
+ EventLoggingService.onEvent(event)
+ } catch {
+ case t: Throwable =>
+ throw new KyuubiException(s"Failed to start SparkSQLEngine:
${t.getMessage}", t)
+ }
// Stop engine before SparkContext stopped to avoid calling a stopped
SparkContext
- addShutdownHook(() => engine.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1)
- EngineTab(engine)
- info(engine.engineStatus)
+ addShutdownHook(() => engine.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 2)
+ addShutdownHook(() => eventLogging.stop(),
SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1)
}
}
@@ -141,22 +150,20 @@ object SparkSQLEngine extends Logging {
var spark: SparkSession = null
try {
spark = createSpark()
- startEngine(spark)
- // blocking main thread
- countDownLatch.await()
- } catch {
- case t: Throwable if currentEngine.isDefined =>
- currentEngine.foreach { engine =>
- val status =
- engine.engineStatus.copy(diagnostic = s"Error State SparkSQL
Engine ${t.getMessage}")
- if (!engine.getServiceState.equals(ServiceState.LATENT)) {
- EventLoggingService.onEvent(status)
- }
- error(status, t)
+ try {
+ startEngine(spark)
+ // blocking main thread
+ countDownLatch.await()
+ } catch {
+ case e: KyuubiException if currentEngine.isDefined =>
+ val engine = currentEngine.get
engine.stop()
- }
- case t: Throwable =>
- error("Create SparkSQL Engine Failed", t)
+ val event = EngineEvent(engine).copy(diagnostic = e.getMessage)
+ EventLoggingService.onEvent(event)
+ error(event, e)
+ }
+ } catch {
+ case t: Throwable => error(s"Failed to instantiate SparkSession:
${t.getMessage}", t)
} finally {
if (spark != null) {
spark.stop()
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
index 568ade2..7539cce 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
@@ -17,29 +17,28 @@
package org.apache.kyuubi.engine.spark.events
+import org.apache.spark.SparkContext
import org.apache.spark.kyuubi.SparkContextHelper
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_LOGGERS
-import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_JSON_LOG_PATH,
ENGINE_EVENT_LOGGERS}
import org.apache.kyuubi.engine.spark.events.EventLoggingService._service
-import org.apache.kyuubi.events.AbstractEventLoggingService
-import org.apache.kyuubi.events.EventLoggerType
-import org.apache.kyuubi.events.JsonEventLogger
+import org.apache.kyuubi.events.{AbstractEventLoggingService, EventLoggerType,
JsonEventLogger}
-class EventLoggingService(engine: SparkSQLEngine)
+class EventLoggingService(spark: SparkContext)
extends AbstractEventLoggingService[KyuubiSparkEvent] {
- override def initialize(conf: KyuubiConf): Unit = {
+ override def initialize(conf: KyuubiConf): Unit = synchronized {
conf.get(ENGINE_EVENT_LOGGERS)
.map(EventLoggerType.withName)
.foreach{
case EventLoggerType.SPARK =>
-
addEventLogger(SparkContextHelper.createSparkHistoryLogger(engine.spark.sparkContext))
+ addEventLogger(SparkContextHelper.createSparkHistoryLogger(spark))
case EventLoggerType.JSON =>
- val jsonEventLogger = new
JsonEventLogger[KyuubiSparkEvent](KyuubiSparkUtil.engineId,
- ENGINE_EVENT_JSON_LOG_PATH,
engine.spark.sparkContext.hadoopConfiguration)
+ val jsonEventLogger = new JsonEventLogger[KyuubiSparkEvent](
+ spark.applicationAttemptId.getOrElse(spark.applicationId),
+ ENGINE_EVENT_JSON_LOG_PATH,
+ spark.hadoopConfiguration)
addService(jsonEventLogger)
addEventLogger(jsonEventLogger)
case logger =>
@@ -49,12 +48,13 @@ class EventLoggingService(engine: SparkSQLEngine)
super.initialize(conf)
}
- override def start(): Unit = {
- _service = Some(this)
+ override def start(): Unit = synchronized {
super.start()
+ // expose the event logging service only when the loggers successfully
start
+ _service = Some(this)
}
- override def stop(): Unit = {
+ override def stop(): Unit = synchronized {
_service = None
super.stop()
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
index b8926bb..0ab844b 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
@@ -99,7 +99,7 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine
with JDBCTestUtils {
test("statementEvent: generate, dump and query") {
val statementEventPath = Paths.get(
- logRoot, "spark_statement", s"day=$currentDate", engine.engineId +
".json")
+ logRoot, "spark_statement", s"day=$currentDate",
spark.sparkContext.applicationId + ".json")
val sql = "select timestamp'2021-09-01';"
withSessionHandle { (client, handle) =>