This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4ed5704 [KYUUBI #983] Refactor the basic abstraction of frontend
service
4ed5704 is described below
commit 4ed570459050e753f537a7cf329fc7447f12f524
Author: yanghua <[email protected]>
AuthorDate: Mon Aug 30 09:34:12 2021 +0800
[KYUUBI #983] Refactor the basic abstraction of frontend service
…
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #984 from yanghua/KYUUBI-983.
Closes #983
db953b8a [yanghua] Rename KyuubiFrontendService to be KyuubiFrontendServices
8c3d7f2a [yanghua] [KYUUBI #983] Refactor the basic abstraction of frontend
service
Authored-by: yanghua <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../org/apache/kyuubi/engine/spark/SparkSQLEngine.scala | 5 ++++-
.../apache/kyuubi/service/AbstractFrontendService.scala | 2 +-
.../main/scala/org/apache/kyuubi/service/Serverable.scala | 4 +---
.../test/scala/org/apache/kyuubi/service/NoopServer.scala | 11 +++++++++--
.../service/authentication/PlainSASLHelperSuite.scala | 2 +-
...iFrontendService.scala => KyuubiFrontendServices.scala} | 14 +++++++-------
.../main/scala/org/apache/kyuubi/server/KyuubiServer.scala | 7 ++++++-
7 files changed, 29 insertions(+), 16 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 4126c7e..bed9f7b 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -43,7 +43,7 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
private val OOMHook = new Runnable { override def run(): Unit = stop() }
private val eventLogging = new EventLoggingService(this)
override val backendService = new SparkSQLBackendService(spark)
- override val frontendService = new ThriftFrontendService(backendService,
OOMHook)
+ val frontendService = new ThriftFrontendService(backendService, OOMHook)
override val discoveryService: Service = new EngineServiceDiscovery(this)
override protected def supportsServiceDiscovery: Boolean = {
@@ -54,6 +54,7 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
val listener = new SparkSQLEngineListener(this)
spark.sparkContext.addSparkListener(listener)
addService(eventLogging)
+ addService(frontendService)
super.initialize(conf)
eventLogging.onEvent(engineStatus.copy(state =
ServiceState.INITIALIZED.id))
}
@@ -76,6 +77,8 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
countDownLatch.countDown()
}
+ override def connectionUrl: String = frontendService.connectionUrl()
+
def engineId: String = {
spark.sparkContext.applicationAttemptId.getOrElse(spark.sparkContext.applicationId)
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala
index 635ba67..a65ede7 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala
@@ -21,7 +21,7 @@ package org.apache.kyuubi.service
* A basic abstraction for frontend services.
*/
abstract class AbstractFrontendService(name: String, be: BackendService)
- extends CompositeService(name) {
+ extends AbstractService(name) {
def connectionUrl(server: Boolean = false): String
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
index f18994e..b36b2f4 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
@@ -26,16 +26,14 @@ abstract class Serverable(name: String) extends
CompositeService(name) {
private val started = new AtomicBoolean(false)
val backendService: AbstractBackendService
- val frontendService: AbstractFrontendService
protected def supportsServiceDiscovery: Boolean
val discoveryService: Service
- def connectionUrl: String = frontendService.connectionUrl()
+ def connectionUrl: String
override def initialize(conf: KyuubiConf): Unit = synchronized {
this.conf = conf
addService(backendService)
- addService(frontendService)
if (supportsServiceDiscovery) {
// Service Discovery depends on the frontend service to be ready
addService(discoveryService)
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
index 233e112..644c8ba 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
@@ -18,13 +18,19 @@
package org.apache.kyuubi.service
import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.config.KyuubiConf
class NoopServer extends Serverable("noop") {
private val OOMHook = new Runnable { override def run(): Unit = stop() }
override val backendService = new NoopBackendService
- override val frontendService = new ThriftFrontendService(backendService,
OOMHook)
+ val frontendService = new ThriftFrontendService(backendService, OOMHook)
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ addService(frontendService)
+ super.initialize(conf)
+ }
override def start(): Unit = {
super.start()
@@ -37,7 +43,8 @@ class NoopServer extends Serverable("noop") {
throw new KyuubiException("no need to stop me")
}
-
override val discoveryService: Service = backendService
override protected val supportsServiceDiscovery: Boolean = false
+
+ override def connectionUrl: String = frontendService.connectionUrl()
}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/PlainSASLHelperSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/PlainSASLHelperSuite.scala
index 1b61715..b57b316 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/PlainSASLHelperSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/PlainSASLHelperSuite.scala
@@ -32,7 +32,7 @@ class PlainSASLHelperSuite extends KyuubiFunSuite {
val server = new NoopServer()
val conf = KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0)
server.initialize(conf)
- val service = server.getServices(1).asInstanceOf[ThriftFrontendService]
+ val service = server.getServices(0).asInstanceOf[ThriftFrontendService]
val tProcessorFactory = PlainSASLHelper.getProcessFactory(service)
val tSocket = new TSocket("0.0.0.0", 0)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendServices.scala
similarity index 78%
rename from
kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendService.scala
rename to
kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendServices.scala
index f1f34bc..3b722ed 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiFrontendServices.scala
@@ -19,30 +19,30 @@ package org.apache.kyuubi.server
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.service.{AbstractFrontendService, BackendService,
ServiceState, ThriftFrontendService}
+import org.apache.kyuubi.service.{AbstractFrontendService, BackendService,
CompositeService, ServiceState, ThriftFrontendService}
/**
* A kyuubi frontend service is a kind of composite service, which used to
* composite multiple frontend services for kyuubi server.
*/
-class KyuubiFrontendService private(name: String, be: BackendService)
- extends AbstractFrontendService(name, be) with Logging {
+class KyuubiFrontendServices private(name: String, be: BackendService)
+ extends CompositeService(name) with Logging {
private val OOMHook = new Runnable { override def run(): Unit = stop() }
def this(be: BackendService) = {
- this(classOf[KyuubiFrontendService].getSimpleName, be)
+ this(classOf[KyuubiFrontendServices].getSimpleName, be)
}
- override def connectionUrl(server: Boolean): String = {
+ def connectionUrl(server: Boolean): String = {
getServiceState match {
case s @ ServiceState.LATENT => throw new
IllegalStateException(s"Illegal Service State: $s")
case _ =>
val defaultFEService =
getServices(0).asInstanceOf[AbstractFrontendService]
- if (defaultFEService != null) {
+ if (defaultFEService != null &&
defaultFEService.isInstanceOf[ThriftFrontendService]) {
defaultFEService.connectionUrl(server)
} else {
- throw new IllegalStateException("Can not find frontend services!")
+ throw new IllegalStateException("Can not find thrift frontend
services!")
}
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index e955331..d97f827 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -82,7 +82,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
def this() = this(classOf[KyuubiServer].getSimpleName)
override val backendService: AbstractBackendService = new
KyuubiBackendService()
- override val frontendService = new KyuubiFrontendService(backendService)
+ val frontendService = new KyuubiFrontendServices(backendService)
private val eventLoggingService: EventLoggingService = new
EventLoggingService
override protected def supportsServiceDiscovery: Boolean = {
ServiceDiscovery.supportServiceDiscovery(conf)
@@ -98,9 +98,14 @@ class KyuubiServer(name: String) extends Serverable(name) {
addService(new MetricsSystem)
}
+ addService(frontendService)
+
super.initialize(conf)
}
override protected def stopServer(): Unit = {}
+ override def connectionUrl: String = {
+ frontendService.connectionUrl(true)
+ }
}