This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 13bf9703f257 [SPARK-52410] Deprecate `PipelineConf` and use `SqlConf` directly 13bf9703f257 is described below commit 13bf9703f2571eed8faee56bbb9c37f9598539ec Author: Yuheng Chang <jonathanyuh...@gmail.com> AuthorDate: Mon Jun 9 13:45:39 2025 -0700 [SPARK-52410] Deprecate `PipelineConf` and use `SqlConf` directly ### What changes were proposed in this pull request? As suggested by [the comment](https://github.com/apache/spark/pull/51050#discussion_r2130318317), directly use `SqlConf` instead of maintaining the thin `PipelineConf` wrapper instead. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #51137 from SCHJonathan/jonathan-chang_data/deprecate-pipeline-conf. Authored-by: Yuheng Chang <jonathanyuh...@gmail.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../org/apache/spark/sql/internal/SQLConf.scala | 25 +++++++++ .../spark/sql/pipelines/graph/GraphExecution.scala | 8 +-- .../spark/sql/pipelines/graph/PipelineConf.scala | 64 ---------------------- .../pipelines/graph/PipelineUpdateContext.scala | 3 - .../graph/PipelineUpdateContextImpl.scala | 10 ++-- .../sql/pipelines/graph/PipelinesErrors.scala | 1 - .../pipelines/graph/TriggeredGraphExecution.scala | 19 ++++--- .../spark/sql/pipelines/utils/ExecutionTest.scala | 2 - 8 files changed, 43 insertions(+), 89 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8246fff00f7e..a322ec8a7e21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -6996,6 +6996,31 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def legacyOutputSchema: Boolean = getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA) + def streamStatePollingInterval: Long = getConf(SQLConf.PIPELINES_STREAM_STATE_POLLING_INTERVAL) + + def watchdogMinRetryTimeInSeconds: Long = { + getConf(SQLConf.PIPELINES_WATCHDOG_MIN_RETRY_TIME_IN_SECONDS) + } + + def watchdogMaxRetryTimeInSeconds: Long = { + val value = getConf(SQLConf.PIPELINES_WATCHDOG_MAX_RETRY_TIME_IN_SECONDS) + if (value < watchdogMinRetryTimeInSeconds) { + throw new IllegalArgumentException( + "Watchdog maximum retry time must be greater than or equal to the watchdog minimum " + + "retry time." + ) + } + value + } + + def maxConcurrentFlows: Int = getConf(SQLConf.PIPELINES_MAX_CONCURRENT_FLOWS) + + def timeoutMsForTerminationJoinAndLock: Long = { + getConf(SQLConf.PIPELINES_TIMEOUT_MS_FOR_TERMINATION_JOIN_AND_LOCK) + } + + def maxFlowRetryAttempts: Int = getConf(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala index fdb94c82d868..381449711dbb 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala @@ -36,8 +36,6 @@ abstract class GraphExecution( /** The `Trigger` configuration for a streaming flow. */ def streamTrigger(flow: Flow): Trigger - protected val pipelineConf: PipelineConf = env.pipelineConf - /** Maps flow identifier to count of consecutive failures. Used to manage flow retries */ private val flowToNumConsecutiveFailure = new ConcurrentHashMap[TableIdentifier, Int].asScala @@ -202,7 +200,7 @@ abstract class GraphExecution( .get(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS.key) .map(_.toInt) // Flow-level conf // Pipeline-level conf, else default flow retry limit - .getOrElse(pipelineConf.maxFlowRetryAttempts) + .getOrElse(env.spark.sessionState.conf.maxFlowRetryAttempts) } /** @@ -211,7 +209,7 @@ abstract class GraphExecution( def stopThread(thread: Thread): Unit = { // Don't wait to join if current thread is the thread to stop if (thread.getId != Thread.currentThread().getId) { - thread.join(env.pipelineConf.timeoutMsForTerminationJoinAndLock) + thread.join(env.spark.sessionState.conf.timeoutMsForTerminationJoinAndLock) // thread is alive after we join. if (thread.isAlive) { throw new TimeoutException("Failed to stop the update due to a hanging control thread.") @@ -268,14 +266,12 @@ object GraphExecution extends Logging { * matching logic. * @param ex Exception to analyze. * @param flowDisplayName The user facing flow name with the error. - * @param pipelineConf Pipeline configuration. * @param currentNumTries Number of times the flow has been tried. * @param maxAllowedRetries Maximum number of retries allowed for the flow. */ def determineFlowExecutionActionFromError( ex: => Throwable, flowDisplayName: => String, - pipelineConf: => PipelineConf, currentNumTries: => Int, maxAllowedRetries: => Int ): FlowExecutionAction = { diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineConf.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineConf.scala deleted file mode 100644 index 42648c6ef58b..000000000000 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineConf.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.pipelines.graph - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.SQLConf - -/** - * Configuration for the pipeline system, which is read from the Spark session's SQL configuration. - */ -@deprecated("TODO(SPARK-52410): Remove this class in favor of using SqlConf directly") -class PipelineConf(spark: SparkSession) { - private val sqlConf: SQLConf = spark.sessionState.conf - - /** Interval in milliseconds to poll the state of streaming flow execution. */ - val streamStatePollingInterval: Long = sqlConf.getConf( - SQLConf.PIPELINES_STREAM_STATE_POLLING_INTERVAL - ) - - /** Minimum time in seconds between retries for the watchdog. */ - val watchdogMinRetryTimeInSeconds: Long = { - sqlConf.getConf(SQLConf.PIPELINES_WATCHDOG_MIN_RETRY_TIME_IN_SECONDS) - } - - /** Maximum time in seconds for the watchdog to retry before giving up. */ - val watchdogMaxRetryTimeInSeconds: Long = { - val value = sqlConf.getConf(SQLConf.PIPELINES_WATCHDOG_MAX_RETRY_TIME_IN_SECONDS) - // TODO(SPARK-52410): Remove this check and use `checkValue` when defining the conf - // in `SqlConf`. - if (value < watchdogMinRetryTimeInSeconds) { - throw new IllegalArgumentException( - "Watchdog maximum retry time must be greater than or equal to the watchdog minimum " + - "retry time." - ) - } - value - } - - /** Maximum number of concurrent flows that can be executed. */ - val maxConcurrentFlows: Int = sqlConf.getConf(SQLConf.PIPELINES_MAX_CONCURRENT_FLOWS) - - /** Timeout in milliseconds for termination join and lock operations. */ - val timeoutMsForTerminationJoinAndLock: Long = { - sqlConf.getConf(SQLConf.PIPELINES_TIMEOUT_MS_FOR_TERMINATION_JOIN_AND_LOCK) - } - - /** Maximum number of retry attempts for a flow execution. */ - val maxFlowRetryAttempts: Int = sqlConf.getConf(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS) -} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala index 93d608dd7668..5a1ab88a432f 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala @@ -50,9 +50,6 @@ trait PipelineUpdateContext { UnionFlowFilter(flowFilterForTables, resetCheckpointFlows) } - /** `PipelineConf` based on the root SparkSession for this update. */ - def pipelineConf: PipelineConf - /** Buffer containing internal events that are emitted during a run of a pipeline. */ def eventBuffer: PipelineRunEventBuffer diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala index 8192068af67e..30f56d7a8e2c 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala @@ -20,7 +20,11 @@ package org.apache.spark.sql.pipelines.graph import scala.annotation.unused import org.apache.spark.sql.classic.SparkSession -import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, PipelineEvent, PipelineRunEventBuffer} +import org.apache.spark.sql.pipelines.logging.{ + FlowProgressEventLogger, + PipelineEvent, + PipelineRunEventBuffer +} /** * An implementation of the PipelineUpdateContext trait used in production. @@ -29,7 +33,7 @@ import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, Pipeline */ @unused( "TODO(SPARK-51727) construct this spark connect server when we expose APIs for users " + - "to interact with a pipeline" + "to interact with a pipeline" ) class PipelineUpdateContextImpl( override val unresolvedGraph: DataflowGraph, @@ -40,8 +44,6 @@ class PipelineUpdateContextImpl( throw new IllegalStateException("SparkSession is not available") ) - override val pipelineConf: PipelineConf = new PipelineConf(spark) - override val eventBuffer = new PipelineRunEventBuffer(eventCallback) override val flowProgressEventLogger: FlowProgressEventLogger = diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala index a0e378f85bce..7116f5fbcf06 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala @@ -128,7 +128,6 @@ object PipelinesErrors extends Logging { val actionFromError = GraphExecution.determineFlowExecutionActionFromError( ex = ex, flowDisplayName = flow.displayName, - pipelineConf = env.pipelineConf, currentNumTries = prevFailureCount + 1, maxAllowedRetries = maxRetries ) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala index fdac2cdb2fc9..503a1aa8e281 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala @@ -69,8 +69,8 @@ class TriggeredGraphExecution( /** Back-off strategy used to determine duration between retries. */ private val backoffStrategy = ExponentialBackoffStrategy( - maxTime = (pipelineConf.watchdogMaxRetryTimeInSeconds * 1000).millis, - stepSize = (pipelineConf.watchdogMinRetryTimeInSeconds * 1000).millis + maxTime = (env.spark.sessionState.conf.watchdogMaxRetryTimeInSeconds * 1000).millis, + stepSize = (env.spark.sessionState.conf.watchdogMinRetryTimeInSeconds * 1000).millis ) override def streamTrigger(flow: Flow): Trigger = { @@ -128,7 +128,9 @@ class TriggeredGraphExecution( } /** Used to control how many flows are executing at once. */ - private val concurrencyLimit: Semaphore = new Semaphore(pipelineConf.maxConcurrentFlows) + private val concurrencyLimit: Semaphore = new Semaphore( + env.spark.sessionState.conf.maxConcurrentFlows + ) /** * Runs the pipeline in a topological order. @@ -180,11 +182,11 @@ class TriggeredGraphExecution( val (runningFlows, availablePermits) = concurrencyLimit.synchronized { (flowsWithState(StreamState.RUNNING).size, concurrencyLimit.availablePermits) } - if ((runningFlows + availablePermits) < pipelineConf.maxConcurrentFlows) { + if ((runningFlows + availablePermits) < env.spark.sessionState.conf.maxConcurrentFlows) { val errorStr = - s"The max concurrency is ${pipelineConf.maxConcurrentFlows}, but there are only " + - s"$availablePermits permits available with $runningFlows flows running. If this " + - s"happens consistently, it's possible we're leaking permits." + s"The max concurrency is ${env.spark.sessionState.conf.maxConcurrentFlows}, but " + + s"there are only $availablePermits permits available with $runningFlows flows running. " + + s"If this happens consistently, it's possible we're leaking permits." logError(errorStr) if (Utils.isTesting) { throw new IllegalStateException(errorStr) @@ -250,7 +252,7 @@ class TriggeredGraphExecution( try { // Put thread to sleep for the configured polling interval to avoid busy-waiting // and holding one CPU core. - Thread.sleep(pipelineConf.streamStatePollingInterval * 1000) + Thread.sleep(env.spark.sessionState.conf.streamStatePollingInterval * 1000) } catch { case _: InterruptedException => return } @@ -298,7 +300,6 @@ class TriggeredGraphExecution( lastExceptionAction = GraphExecution.determineFlowExecutionActionFromError( ex = e, flowDisplayName = flow.displayName, - pipelineConf = pipelineConf, currentNumTries = prevFailureCount + 1, maxAllowedRetries = maxRetryAttemptsForFlow(flowIdentifier) ) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala index 7aa2f9d63106..96db55f8ebfa 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.pipelines.graph.{ DataflowGraph, FlowFilter, NoTables, - PipelineConf, PipelineUpdateContext, TableFilter } @@ -62,7 +61,6 @@ trait TestPipelineUpdateContextMixin { refreshTables: TableFilter = AllTables, resetCheckpointFlows: FlowFilter = AllFlows ) extends PipelineUpdateContext { - val pipelineConf: PipelineConf = new PipelineConf(spark) val eventBuffer = new PipelineRunEventBuffer(eventCallback = _ => ()) override def flowProgressEventLogger: FlowProgressEventLogger = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org