This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 20ee845 [KYUUBI #1386] Refactor SparkThriftBinaryFrontendService to
extract connectionUrl into super class
20ee845 is described below
commit 20ee845c0bff1a9e6d39417821b3222efa03859c
Author: yanghua <[email protected]>
AuthorDate: Tue Nov 16 09:41:55 2021 +0800
[KYUUBI #1386] Refactor SparkThriftBinaryFrontendService to extract
connectionUrl into super class
…
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1387 from yanghua/KYUUBI-1386.
Closes #1386
29415904 [yanghua] [KYUUBI #1386] Refactor SparkThriftBinaryFrontendService
to extract connectionUrl into super class
Authored-by: yanghua <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../spark/SparkThriftBinaryFrontendService.scala | 20 --------------------
.../kyuubi/service/ThriftBinaryFrontendService.scala | 19 ++++++++++++++++++-
2 files changed, 18 insertions(+), 21 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
index ee0862d..b740fae 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
@@ -24,7 +24,6 @@ import
org.apache.hive.service.rpc.thrift.{TRenewDelegationTokenReq, TRenewDeleg
import org.apache.spark.kyuubi.SparkContextHelper
import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_CONNECTION_URL_USE_HOSTNAME
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service,
ThriftBinaryFrontendService}
import org.apache.kyuubi.util.KyuubiHadoopUtils
@@ -137,25 +136,6 @@ class SparkThriftBinaryFrontendService(
None
}
}
-
- override def connectionUrl: String = {
- checkInitialized()
- if (conf.get(ENGINE_CONNECTION_URL_USE_HOSTNAME)) {
- s"${serverAddr.getCanonicalHostName}:$portNum"
- } else {
- // engine use address if run on k8s with cluster mode
- s"${serverAddr.getHostAddress}:$portNum"
- }
- }
-
- // When a OOM occurs, here we de-register the engine by stop its
discoveryService.
- // Then the current engine will not be connected by new client anymore but
keep the existing ones
- // alive. In this case we can reduce the engine's overhead and make it
possible recover from that.
- // We shall not tear down the whole engine by serverable.stop to make the
engine unreachable for
- // the existing clients which are still getting statuses and reporting to
the end-users.
- override protected def oomHook: Runnable = {
- () => discoveryService.foreach(_.stop())
- }
}
object SparkThriftBinaryFrontendService {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
index 7b3213f..0b43f96 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
@@ -51,7 +51,14 @@ abstract class ThriftBinaryFrontendService(name: String)
private var authFactory: KyuubiAuthenticationFactory = _
private var hadoopConf: Configuration = _
- protected def oomHook: Runnable
+ // When a OOM occurs, here we de-register the engine by stop its
discoveryService.
+ // Then the current engine will not be connected by new client anymore but
keep the existing ones
+ // alive. In this case we can reduce the engine's overhead and make it
possible recover from that.
+ // We shall not tear down the whole engine by serverable.stop to make the
engine unreachable for
+ // the existing clients which are still getting statuses and reporting to
the end-users.
+ protected def oomHook: Runnable = {
+ () => discoveryService.foreach(_.stop())
+ }
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
@@ -136,6 +143,16 @@ abstract class ThriftBinaryFrontendService(name: String)
super.stop()
}
+ override def connectionUrl: String = {
+ checkInitialized()
+ if (conf.get(ENGINE_CONNECTION_URL_USE_HOSTNAME)) {
+ s"${serverAddr.getCanonicalHostName}:$portNum"
+ } else {
+ // engine use address if run on k8s with cluster mode
+ s"${serverAddr.getHostAddress}:$portNum"
+ }
+ }
+
private def getProxyUser(
sessionConf: java.util.Map[String, String],
ipAddress: String,