This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new dfd7124 [KYUUBI #1988] Tune initialization of vars depend on system
level confs
dfd7124 is described below
commit dfd71242563dd4e9c79339f1123cd712706b06d1
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Mar 3 10:06:52 2022 +0800
[KYUUBI #1988] Tune initialization of vars depend on system level confs
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
The `new SparkConf()` and `KyuubiConf()` executes early than UT, and can
not re-initialize after changing system properties, which makes conf
overwriting does not work.
After the change, the
`sparkConf.setIfMissing("spark.sql.catalogImplementation", defaultCat)` work as
expected when we overwriting `spark.sql.catalogImplementation` in UT, and Hudi
uses `in-memory` catalog to create table.
<img width="891" alt="Xnip2022-03-02_19-10-06"
src="https://user-images.githubusercontent.com/26535726/156351805-bb403ee8-0b89-4db2-b91f-d7b2b9957838.png">
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1995 from pan3793/lazy.
Closes #1988
c264a224 [Cheng Pan] Fix
d1a23fbc [Cheng Pan] Lazy initialize vars depends on system level confs
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../kyuubi/engine/spark/SparkSQLEngine.scala | 37 ++++++++++++----------
.../kyuubi/engine/spark/WithSparkSQLEngine.scala | 4 +--
.../spark/operation/SparkDeltaOperationSuite.scala | 7 ----
.../spark/operation/SparkHudiOperationSuite.scala | 7 ----
.../operation/SparkIcebergOperationSuite.scala | 7 ----
5 files changed, 22 insertions(+), 40 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index c8839d8..933cbea 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -71,41 +71,48 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
object SparkSQLEngine extends Logging {
- val sparkConf: SparkConf = new SparkConf()
+ private var _sparkConf: SparkConf = _
- val kyuubiConf: KyuubiConf = KyuubiConf()
+ private var _kyuubiConf: KyuubiConf = _
+
+ def kyuubiConf: KyuubiConf = _kyuubiConf
var currentEngine: Option[SparkSQLEngine] = None
- private val user = currentUser
+ private lazy val user = currentUser
private val countDownLatch = new CountDownLatch(1)
+ SignalRegister.registerLogger(logger)
+ setupConf()
+
def setupConf(): Unit = {
- val rootDir =
sparkConf.getOption("spark.repl.classdir").getOrElse(getLocalDir(sparkConf))
+ _sparkConf = new SparkConf()
+ _kyuubiConf = KyuubiConf()
+ val rootDir =
_sparkConf.getOption("spark.repl.classdir").getOrElse(getLocalDir(_sparkConf))
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
- sparkConf.setIfMissing("spark.sql.execution.topKSortFallbackThreshold",
"10000")
-
sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled",
"true")
- sparkConf.setIfMissing("spark.master", "local")
- sparkConf.setIfMissing("spark.ui.port", "0")
+ _sparkConf.setIfMissing("spark.sql.execution.topKSortFallbackThreshold",
"10000")
+
_sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled",
"true")
+ _sparkConf.setIfMissing("spark.master", "local")
+ _sparkConf.setIfMissing("spark.ui.port", "0")
// register the repl's output dir with the file server.
// see also `spark.repl.classdir`
- sparkConf.set("spark.repl.class.outputDir",
outputDir.toFile.getAbsolutePath)
- sparkConf.setIfMissing(
+ _sparkConf.set("spark.repl.class.outputDir",
outputDir.toFile.getAbsolutePath)
+ _sparkConf.setIfMissing(
"spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads",
"20")
val appName = s"kyuubi_${user}_spark_${Instant.now}"
- sparkConf.setIfMissing("spark.app.name", appName)
+ _sparkConf.setIfMissing("spark.app.name", appName)
val defaultCat = if (KyuubiSparkUtil.hiveClassesArePresent) "hive" else
"in-memory"
- sparkConf.setIfMissing("spark.sql.catalogImplementation", defaultCat)
+ _sparkConf.setIfMissing("spark.sql.catalogImplementation", defaultCat)
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY,
RetryPolicies.N_TIME.toString)
// Pass kyuubi config from spark with `spark.kyuubi`
val sparkToKyuubiPrefix = "spark.kyuubi."
- sparkConf.getAllWithPrefix(sparkToKyuubiPrefix).foreach { case (k, v) =>
+ _sparkConf.getAllWithPrefix(sparkToKyuubiPrefix).foreach { case (k, v) =>
kyuubiConf.set(s"kyuubi.$k", v)
}
@@ -117,7 +124,7 @@ object SparkSQLEngine extends Logging {
}
def createSpark(): SparkSession = {
- val session = SparkSession.builder.config(sparkConf).getOrCreate
+ val session = SparkSession.builder.config(_sparkConf).getOrCreate
(kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++
kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
.foreach { sqlStr =>
session.sparkContext.setJobGroup(
@@ -174,8 +181,6 @@ object SparkSQLEngine extends Logging {
}
def main(args: Array[String]): Unit = {
- SignalRegister.registerLogger(logger)
- setupConf()
val startedTime = System.currentTimeMillis()
val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match
{
case Some(t) => t.toLong
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
index e292e71..3ed851d 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
@@ -28,7 +28,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
protected var engine: SparkSQLEngine = _
// conf will be loaded until start spark engine
def withKyuubiConf: Map[String, String]
- val kyuubiConf: KyuubiConf = SparkSQLEngine.kyuubiConf
+ def kyuubiConf: KyuubiConf = SparkSQLEngine.kyuubiConf
protected var connectionUrl: String = _
@@ -61,7 +61,6 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
System.setProperty("spark.ui.enabled", "false")
withKyuubiConf.foreach { case (k, v) =>
System.setProperty(k, v)
- kyuubiConf.set(k, v)
}
SparkSession.clearActiveSession()
@@ -82,7 +81,6 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
// we need to clean up conf since it's the global config in same jvm.
withKyuubiConf.foreach { case (k, _) =>
System.clearProperty(k)
- kyuubiConf.unset(k)
}
if (engine != null) {
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkDeltaOperationSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkDeltaOperationSuite.scala
index 449cbe0..8ce27e3 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkDeltaOperationSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkDeltaOperationSuite.scala
@@ -29,11 +29,4 @@ class SparkDeltaOperationSuite extends WithSparkSQLEngine
override protected def jdbcUrl: String = getJdbcUrl
override def withKyuubiConf: Map[String, String] = extraConfigs
-
- override def afterAll(): Unit = {
- super.afterAll()
- for ((k, _) <- withKyuubiConf) {
- System.clearProperty(k)
- }
- }
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkHudiOperationSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkHudiOperationSuite.scala
index 649e7c0..c5e8be3 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkHudiOperationSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkHudiOperationSuite.scala
@@ -27,11 +27,4 @@ class SparkHudiOperationSuite extends WithSparkSQLEngine
with HudiMetadataTests
override protected def jdbcUrl: String = getJdbcUrl
override def withKyuubiConf: Map[String, String] = extraConfigs
-
- override def afterAll(): Unit = {
- super.afterAll()
- for ((k, _) <- withKyuubiConf) {
- System.clearProperty(k)
- }
- }
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala
index 0fbe762..48e791e 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala
@@ -29,11 +29,4 @@ class SparkIcebergOperationSuite extends WithSparkSQLEngine
override protected def jdbcUrl: String = getJdbcUrl
override def withKyuubiConf: Map[String, String] = extraConfigs
-
- override def afterAll(): Unit = {
- super.afterAll()
- for ((k, _) <- withKyuubiConf) {
- System.clearProperty(k)
- }
- }
}