This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 9615db55c [KYUUBI #5438] Add common method to get session level config
9615db55c is described below
commit 9615db55ce1dc5cacab5a422e89ac7ddd6e8c0b3
Author: David Yuan <[email protected]>
AuthorDate: Wed Nov 8 20:49:29 2023 +0800
[KYUUBI #5438] Add common method to get session level config
### _Why are the changes needed?_
Current now, in spark-engine module, some session-level configurations are
ignored due to the complexity of get session-level configurations in kyuubi
spark engine, so As discussed in
https://github.com/apache/kyuubi/pull/5410#discussion_r1360164253. If we need
unit test use withSessionConf method, we need make the code get configuration
from the right session
The PR is unfinished, it need wait the pr
https://github.com/apache/kyuubi/pull/5410 success so that i can use the new
change in unit test
closes #5438
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No
Closes #5487 from
davidyuan1223/5438_add_common_method_to_support_session_config.
Closes #5438
e1ded3654 [davidyuan] add more optional session level to get conf
84c4568d9 [davidyuan] add more optional session level to get conf
4d709023e [davidyuan] add more optional session level to get conf
96d7cde05 [davidyuan] Revert "add more optional session level to get conf"
940f8f878 [davidyuan] add more optional session level to get conf
15641e8ec [davidyuan] add more optional session level to get conf
d83893119 [davidyuan] Merge branch
'5438_add_common_method_to_support_session_config' of
https://github.com/davidyuan1223/kyuubi into
5438_add_common_method_to_support_session_config
2de96b5f8 [davidyuan] add common method to get session level config
3ec73adf8 [liangbowen] [KYUUBI #5522] [BATCH] Ignore main class for PySpark
batch job submission
d8b808dbe [Cheng Pan] [KYUUBI #5523] [DOC] Update the Kyuubi supported
components version
c7d15aed0 [Cheng Pan] [KYUUBI #5483] Release Spark TPC-H/DS Connectors with
Scala 2.13
4a1db4206 [zwangsheng] [KYUUBI #5513][BATCH] Always redirect delete batch
request to Kyuubi instance that owns batch session
b06e04485 [labbomb] [KYUUBI #5517] [UI] Initial implement the SQL Lab page
88bb6b4a8 [liangbowen] [KYUUBI #5486] Bump Kafka client version from 3.4.0
to 3.5.1
538a648cd [davidyuan] [KYUUBI #4186] Spark showProgress with JobInfo
682e5b5e3 [Xianxun Ye] [KYUUBI #5405] [FLINK] Support Flink 1.18
c71528ea3 [Cheng Pan] [KYUUBI #5484] Remove legacy Web UI
ee52b2a69 [Angerszhuuuu] [KYUUBI #5446][AUTHZ] Support
Create/Drop/Show/Reresh index command for Hudi
6a5bb1026 [weixi] [KYUUBI #5380][UT] Create PySpark batch jobs tests for
RESTful API
86f692d73 [Kent Yao] [KYUUBI #5512] [AuthZ] Remove the non-existent query
specs in Deletes and Updates
dfdd7a3f4 [fwang12] [KYUUBI #5499][KYUUBI #2503] Catch any exception when
closing idle session
b7b354485 [伟程] [KYUUBI #5212] Fix configuration errors causing by helm
charts of prometheus services
d123a5a1e [liupeiyue] [KYUUBI #5282] Support configure Trino session conf
in `kyuubi-default.conf`
075043754 [yangming] [KYUUBI #5294] [DOC] Update supported dialects for
JDBC engine
9c75d8252 [zwangsheng] [KYUUBI #5435][INFRA][TEST] Improve Kyuubi On
Kubernetes IT
1dc264add [Angerszhuuuu] [KYUUBI #5479][AUTHZ] Support Hudi
CallProcedureHoodieCommand for stored procedures
bc3fcbb4d [Angerszhuuuu] [KYUUBI #5472] Permanent View should pass column
when child plan no output
a67b8245a [Fantasy-Jay] [KYUUBI #5382][JDBC] Duplication cleanup
improvement in JdbcDialect and schema helpers
c039e1b6f [Kent Yao] [KYUUBI #5497] [AuthZ] Simplify debug message for
missing field/method in ReflectUtils
0c8be79ab [Angerszhuuuu] [KYUUBI #5475][FOLLOWUP] Authz check permanent
view's subquery should check view's correct privilege
1293cf208 [Kent Yao] [KYUUBI #5500] Add Kyuubi Code Program to Doc
e2754fedd [Angerszhuuuu] [KYUUBI #5492][AUTHZ] saveAsTable create
DataSource table miss db info
0c53d0079 [Angerszhuuuu] [KYUUBI #5447][FOLLOWUP] Remove unrelated debug
prints in TableIdentifierTableExtractor
119c393fc [Angerszhuuuu] [KYUUBI #5447][AUTHZ] Support Hudi
DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand
3af5ed13c [yikaifei] [KYUUBI #5427] [AUTHZ] Shade spark authz plugin
503c3f7fd [davidyuan] Merge remote-tracking branch
'origin/5438_add_common_method_to_support_session_config' into
5438_add_common_method_to_support_session_config
7a67ace08 [davidyuan] add common method to get session level config
3f4231735 [davidyuan] add common method to get session level config
bb5d5ce44 [davidyuan] add common method to get session level config
623200ff9 [davidyuan] Merge remote-tracking branch
'origin/5438_add_common_method_to_support_session_config' into
5438_add_common_method_to_support_session_config
8011959da [davidyuan] add common method to get session level config
605ef16bc [davidyuan] Merge remote-tracking branch
'origin/5438_add_common_method_to_support_session_config' into
5438_add_common_method_to_support_session_config
bb63ed87c [davidyuan] add common method to get session level config
d9cf248d4 [davidyuan] add common method to get session level config
c8647ef53 [davidyuan] add common method to get session level config
618c0f65e [david yuan] Merge branch 'apache:master' into
5438_add_common_method_to_support_session_config
c1024bded [david yuan] Merge branch 'apache:master' into
5438_add_common_method_to_support_session_config
32028f99f [davidyuan] add common method to get session level config
03e28874c [davidyuan] add common method to get session level config
Lead-authored-by: David Yuan <[email protected]>
Co-authored-by: davidyuan <[email protected]>
Co-authored-by: Angerszhuuuu <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Co-authored-by: Kent Yao <[email protected]>
Co-authored-by: liangbowen <[email protected]>
Co-authored-by: david yuan <[email protected]>
Co-authored-by: zwangsheng <[email protected]>
Co-authored-by: yangming <[email protected]>
Co-authored-by: 伟程 <[email protected]>
Co-authored-by: weixi <[email protected]>
Co-authored-by: fwang12 <[email protected]>
Co-authored-by: Xianxun Ye <[email protected]>
Co-authored-by: liupeiyue <[email protected]>
Co-authored-by: Fantasy-Jay <[email protected]>
Co-authored-by: yikaifei <[email protected]>
Co-authored-by: labbomb <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/spark/KyuubiSparkUtil.scala | 15 ++++++++++
.../engine/spark/operation/ExecutePython.scala | 9 ++----
.../engine/spark/operation/ExecuteStatement.scala | 3 +-
.../kyuubi/engine/spark/operation/GetTables.scala | 6 ++--
.../engine/spark/operation/PlanOnlyStatement.scala | 6 ++--
.../engine/spark/operation/SparkOperation.scala | 18 ++++--------
.../apache/spark/kyuubi/SQLOperationListener.scala | 10 +++----
.../spark/kyuubi/SQLOperationListenerSuite.scala | 34 ++++++++++++----------
8 files changed, 50 insertions(+), 51 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
index b9fb93259..2e33d8ce6 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.util.kvstore.KVIndex
import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.ConfigEntry
import org.apache.kyuubi.util.SemanticVersion
object KyuubiSparkUtil extends Logging {
@@ -98,4 +99,18 @@ object KyuubiSparkUtil extends Logging {
// Given that we are on the Spark SQL engine side, the
[[org.apache.spark.SPARK_VERSION]] can be
// represented as the runtime version of the Spark SQL engine.
lazy val SPARK_ENGINE_RUNTIME_VERSION: SemanticVersion =
SemanticVersion(SPARK_VERSION)
+
+ /**
+ * Get session level config value
+ * @param configEntry configEntry
+ * @param spark sparkSession
+ * @tparam T any type
+ * @return session level config value, if spark not set this config,
+ * default return kyuubi's config
+ */
+ def getSessionConf[T](configEntry: ConfigEntry[T], spark: SparkSession): T =
{
+
spark.conf.getOption(configEntry.key).map(configEntry.valueConverter).getOrElse
{
+ SparkSQLEngine.kyuubiConf.get(configEntry)
+ }
+ }
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index badd83530..b3643a7ae 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -295,10 +295,8 @@ object ExecutePython extends Logging {
}
def getSparkPythonExecFromArchive(spark: SparkSession, session: Session):
Option[String] = {
- val pythonEnvArchive =
spark.conf.getOption(ENGINE_SPARK_PYTHON_ENV_ARCHIVE.key)
-
.orElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_ENV_ARCHIVE))
- val pythonEnvExecPath =
spark.conf.getOption(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH.key)
-
.getOrElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH))
+ val pythonEnvArchive = getSessionConf(ENGINE_SPARK_PYTHON_ENV_ARCHIVE,
spark)
+ val pythonEnvExecPath =
getSessionConf(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, spark)
pythonEnvArchive.map {
archive =>
var uri = new URI(archive)
@@ -311,8 +309,7 @@ object ExecutePython extends Logging {
}
def getSparkPythonHomeFromArchive(spark: SparkSession, session: Session):
Option[String] = {
- val pythonHomeArchive =
spark.conf.getOption(ENGINE_SPARK_PYTHON_HOME_ARCHIVE.key)
-
.orElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_HOME_ARCHIVE))
+ val pythonHomeArchive = getSessionConf(ENGINE_SPARK_PYTHON_HOME_ARCHIVE,
spark)
pythonHomeArchive.map {
archive =>
var uri = new URI(archive)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 17d8a7412..acb49d65e 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -148,8 +148,7 @@ class ExecuteStatement(
s"__kyuubi_operation_result_arrow_timestampAsString__=$timestampAsString")
private def collectAsIterator(resultDF: DataFrame): FetchIterator[_] = {
- val resultMaxRows =
spark.conf.getOption(OPERATION_RESULT_MAX_ROWS.key).map(_.toInt)
- .getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_MAX_ROWS))
+ val resultMaxRows: Int = getSessionConf(OPERATION_RESULT_MAX_ROWS, spark)
if (incrementalCollect) {
if (resultMaxRows > 0) {
warn(s"Ignore ${OPERATION_RESULT_MAX_ROWS.key} on incremental collect
mode.")
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
index 980e4fdb1..75ce94921 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types.StructType
import
org.apache.kyuubi.config.KyuubiConf.OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf
import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@@ -34,10 +35,7 @@ class GetTables(
extends SparkOperation(session) {
protected val ignoreTableProperties =
- spark.conf.getOption(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES.key)
match {
- case Some(s) => s.toBoolean
- case _ =>
session.sessionManager.getConf.get(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES)
- }
+ getSessionConf(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES, spark)
override def statement: String = {
super.statement +
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index 4f8808313..f2a670471 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER,
OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator,
ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle,
OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle,
PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError,
unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError,
unknownStyleError}
@@ -49,9 +50,7 @@ class PlanOnlyStatement(
.getOrElse(session.sessionManager.getConf.get(OPERATION_PLAN_ONLY_EXCLUDES))
}
- private val style = PlanOnlyStyle.fromString(spark.conf.get(
- OPERATION_PLAN_ONLY_OUT_STYLE.key,
- session.sessionManager.getConf.get(OPERATION_PLAN_ONLY_OUT_STYLE)))
+ private val style =
PlanOnlyStyle.fromString(getSessionConf(OPERATION_PLAN_ONLY_OUT_STYLE, spark))
spark.conf.set(OPERATION_PLAN_ONLY_OUT_STYLE.key, style.name)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@@ -74,7 +73,6 @@ class PlanOnlyStatement(
withLocalProperties {
SQLConf.withExistingConf(spark.sessionState.conf) {
val parsed = spark.sessionState.sqlParser.parsePlan(statement)
-
parsed match {
case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
result = spark.sql(statement)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 1de360f07..f40f1d490 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_SPARK_LISTENER_ENABLED,
SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED}
import
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY,
KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY}
-import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf,
SPARK_SCHEDULER_POOL_KEY}
import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY
import org.apache.kyuubi.engine.spark.schema.{RowSet, SchemaHelper}
@@ -63,11 +63,8 @@ abstract class SparkOperation(session: Session)
override def redactedStatement: String =
redact(spark.sessionState.conf.stringRedactionPattern, statement)
- protected val operationSparkListenerEnabled =
- spark.conf.getOption(OPERATION_SPARK_LISTENER_ENABLED.key) match {
- case Some(s) => s.toBoolean
- case _ =>
session.sessionManager.getConf.get(OPERATION_SPARK_LISTENER_ENABLED)
- }
+ protected val operationSparkListenerEnabled: Boolean =
+ getSessionConf(OPERATION_SPARK_LISTENER_ENABLED, spark)
protected val operationListener: Option[SQLOperationListener] =
if (operationSparkListenerEnabled) {
@@ -80,10 +77,7 @@ abstract class SparkOperation(session: Session)
operationListener.foreach(spark.sparkContext.addSparkListener(_))
}
- private val progressEnable =
spark.conf.getOption(SESSION_PROGRESS_ENABLE.key) match {
- case Some(s) => s.toBoolean
- case _ => session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE)
- }
+ private val progressEnable: Boolean =
getSessionConf(SESSION_PROGRESS_ENABLE, spark)
protected def supportProgress: Boolean = false
@@ -113,9 +107,7 @@ abstract class SparkOperation(session: Session)
protected val forceCancel =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)
- protected val schedulerPool =
- spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse(
- session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL))
+ protected val schedulerPool =
getSessionConf(KyuubiConf.OPERATION_SCHEDULER_POOL, spark)
protected val isSessionUserSignEnabled: Boolean =
spark.sparkContext.getConf.getBoolean(
s"spark.${SESSION_USER_SIGN_ENABLED.key}",
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
index 686cb1f35..a7d409c7c 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
@@ -27,10 +27,9 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
import org.apache.kyuubi.Logging
-import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS,
ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT,
ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
-import
org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SQL_EXECUTION_ID_KEY
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf,
SPARK_SQL_EXECUTION_ID_KEY}
import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.log.OperationLog
@@ -50,15 +49,14 @@ class SQLOperationListener(
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt,
SparkStageInfo]()
private var executionId: Option[Long] = None
- private val conf: KyuubiConf = operation.getSession.sessionManager.getConf
private lazy val consoleProgressBar =
- if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
+ if (getSessionConf(ENGINE_SPARK_SHOW_PROGRESS, spark)) {
Some(new SparkConsoleProgressBar(
operation,
activeJobs,
activeStages,
- conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL),
- conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT)))
+ getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL, spark),
+ getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, spark)))
} else {
None
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala
index f732f7c38..c5d24399d 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala
@@ -29,9 +29,7 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper
class SQLOperationListenerSuite extends WithSparkSQLEngine with
HiveJDBCTestHelper {
- override def withKyuubiConf: Map[String, String] = Map(
- KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true",
- KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200")
+ override def withKyuubiConf: Map[String, String] = Map.empty
override protected def jdbcUrl: String = getJdbcUrl
@@ -58,19 +56,23 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine
with HiveJDBCTestHelp
}
test("operation listener with progress job info") {
- val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM
range(1, 3, 1, 2);"
- withSessionHandle { (client, handle) =>
- val req = new TExecuteStatementReq()
- req.setSessionHandle(handle)
- req.setStatement(sql)
- val tExecuteStatementResp = client.ExecuteStatement(req)
- val opHandle = tExecuteStatementResp.getOperationHandle
- val fetchResultsReq = new TFetchResultsReq(opHandle,
TFetchOrientation.FETCH_NEXT, 1000)
- fetchResultsReq.setFetchType(1.toShort)
- eventually(timeout(90.seconds), interval(500.milliseconds)) {
- val resultsResp = client.FetchResults(fetchResultsReq)
- val logs =
resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
- assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]")))
+ withSessionConf(Map(
+ KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true",
+ KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200"))()()
{
+ val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM
range(1, 3, 1, 2);"
+ withSessionHandle { (client, handle) =>
+ val req = new TExecuteStatementReq()
+ req.setSessionHandle(handle)
+ req.setStatement(sql)
+ val tExecuteStatementResp = client.ExecuteStatement(req)
+ val opHandle = tExecuteStatementResp.getOperationHandle
+ val fetchResultsReq = new TFetchResultsReq(opHandle,
TFetchOrientation.FETCH_NEXT, 1000)
+ fetchResultsReq.setFetchType(1.toShort)
+ eventually(timeout(90.seconds), interval(500.milliseconds)) {
+ val resultsResp = client.FetchResults(fetchResultsReq)
+ val logs =
resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
+ assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage
.*\\]")))
+ }
}
}
}