This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 13bf9703f257 [SPARK-52410] Deprecate `PipelineConf` and use `SqlConf` 
directly
13bf9703f257 is described below

commit 13bf9703f2571eed8faee56bbb9c37f9598539ec
Author: Yuheng Chang <jonathanyuh...@gmail.com>
AuthorDate: Mon Jun 9 13:45:39 2025 -0700

    [SPARK-52410] Deprecate `PipelineConf` and use `SqlConf` directly
    
    ### What changes were proposed in this pull request?
    
    As suggested by [the 
comment](https://github.com/apache/spark/pull/51050#discussion_r2130318317), 
directly use `SqlConf` instead of maintaining the thin `PipelineConf` wrapper 
instead.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51137 from SCHJonathan/jonathan-chang_data/deprecate-pipeline-conf.
    
    Authored-by: Yuheng Chang <jonathanyuh...@gmail.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 25 +++++++++
 .../spark/sql/pipelines/graph/GraphExecution.scala |  8 +--
 .../spark/sql/pipelines/graph/PipelineConf.scala   | 64 ----------------------
 .../pipelines/graph/PipelineUpdateContext.scala    |  3 -
 .../graph/PipelineUpdateContextImpl.scala          | 10 ++--
 .../sql/pipelines/graph/PipelinesErrors.scala      |  1 -
 .../pipelines/graph/TriggeredGraphExecution.scala  | 19 ++++---
 .../spark/sql/pipelines/utils/ExecutionTest.scala  |  2 -
 8 files changed, 43 insertions(+), 89 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8246fff00f7e..a322ec8a7e21 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -6996,6 +6996,31 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def legacyOutputSchema: Boolean = 
getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)
 
+  def streamStatePollingInterval: Long = 
getConf(SQLConf.PIPELINES_STREAM_STATE_POLLING_INTERVAL)
+
+  def watchdogMinRetryTimeInSeconds: Long = {
+    getConf(SQLConf.PIPELINES_WATCHDOG_MIN_RETRY_TIME_IN_SECONDS)
+  }
+
+  def watchdogMaxRetryTimeInSeconds: Long = {
+    val value = getConf(SQLConf.PIPELINES_WATCHDOG_MAX_RETRY_TIME_IN_SECONDS)
+    if (value < watchdogMinRetryTimeInSeconds) {
+      throw new IllegalArgumentException(
+        "Watchdog maximum retry time must be greater than or equal to the 
watchdog minimum " +
+          "retry time."
+      )
+    }
+    value
+  }
+
+  def maxConcurrentFlows: Int = getConf(SQLConf.PIPELINES_MAX_CONCURRENT_FLOWS)
+
+  def timeoutMsForTerminationJoinAndLock: Long = {
+    getConf(SQLConf.PIPELINES_TIMEOUT_MS_FOR_TERMINATION_JOIN_AND_LOCK)
+  }
+
+  def maxFlowRetryAttempts: Int = 
getConf(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
index fdb94c82d868..381449711dbb 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
@@ -36,8 +36,6 @@ abstract class GraphExecution(
   /** The `Trigger` configuration for a streaming flow. */
   def streamTrigger(flow: Flow): Trigger
 
-  protected val pipelineConf: PipelineConf = env.pipelineConf
-
   /** Maps flow identifier to count of consecutive failures. Used to manage 
flow retries */
   private val flowToNumConsecutiveFailure = new 
ConcurrentHashMap[TableIdentifier, Int].asScala
 
@@ -202,7 +200,7 @@ abstract class GraphExecution(
       .get(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS.key)
       .map(_.toInt) // Flow-level conf
       // Pipeline-level conf, else default flow retry limit
-      .getOrElse(pipelineConf.maxFlowRetryAttempts)
+      .getOrElse(env.spark.sessionState.conf.maxFlowRetryAttempts)
   }
 
   /**
@@ -211,7 +209,7 @@ abstract class GraphExecution(
   def stopThread(thread: Thread): Unit = {
     // Don't wait to join if current thread is the thread to stop
     if (thread.getId != Thread.currentThread().getId) {
-      thread.join(env.pipelineConf.timeoutMsForTerminationJoinAndLock)
+      
thread.join(env.spark.sessionState.conf.timeoutMsForTerminationJoinAndLock)
       // thread is alive after we join.
       if (thread.isAlive) {
         throw new TimeoutException("Failed to stop the update due to a hanging 
control thread.")
@@ -268,14 +266,12 @@ object GraphExecution extends Logging {
    *  matching logic.
    * @param ex Exception to analyze.
    * @param flowDisplayName The user facing flow name with the error.
-   * @param pipelineConf Pipeline configuration.
    * @param currentNumTries Number of times the flow has been tried.
    * @param maxAllowedRetries Maximum number of retries allowed for the flow.
    */
   def determineFlowExecutionActionFromError(
       ex: => Throwable,
       flowDisplayName: => String,
-      pipelineConf: => PipelineConf,
       currentNumTries: => Int,
       maxAllowedRetries: => Int
   ): FlowExecutionAction = {
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineConf.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineConf.scala
deleted file mode 100644
index 42648c6ef58b..000000000000
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineConf.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.pipelines.graph
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * Configuration for the pipeline system, which is read from the Spark 
session's SQL configuration.
- */
-@deprecated("TODO(SPARK-52410): Remove this class in favor of using SqlConf 
directly")
-class PipelineConf(spark: SparkSession) {
-  private val sqlConf: SQLConf = spark.sessionState.conf
-
-  /** Interval in milliseconds to poll the state of streaming flow execution. 
*/
-  val streamStatePollingInterval: Long = sqlConf.getConf(
-    SQLConf.PIPELINES_STREAM_STATE_POLLING_INTERVAL
-  )
-
-  /** Minimum time in seconds between retries for the watchdog. */
-  val watchdogMinRetryTimeInSeconds: Long = {
-    sqlConf.getConf(SQLConf.PIPELINES_WATCHDOG_MIN_RETRY_TIME_IN_SECONDS)
-  }
-
-  /** Maximum time in seconds for the watchdog to retry before giving up. */
-  val watchdogMaxRetryTimeInSeconds: Long = {
-    val value = 
sqlConf.getConf(SQLConf.PIPELINES_WATCHDOG_MAX_RETRY_TIME_IN_SECONDS)
-    // TODO(SPARK-52410): Remove this check and use `checkValue` when defining 
the conf
-    //                    in `SqlConf`.
-    if (value < watchdogMinRetryTimeInSeconds) {
-      throw new IllegalArgumentException(
-        "Watchdog maximum retry time must be greater than or equal to the 
watchdog minimum " +
-        "retry time."
-      )
-    }
-    value
-  }
-
-  /** Maximum number of concurrent flows that can be executed. */
-  val maxConcurrentFlows: Int = 
sqlConf.getConf(SQLConf.PIPELINES_MAX_CONCURRENT_FLOWS)
-
-  /** Timeout in milliseconds for termination join and lock operations. */
-  val timeoutMsForTerminationJoinAndLock: Long = {
-    sqlConf.getConf(SQLConf.PIPELINES_TIMEOUT_MS_FOR_TERMINATION_JOIN_AND_LOCK)
-  }
-
-  /** Maximum number of retry attempts for a flow execution. */
-  val maxFlowRetryAttempts: Int = 
sqlConf.getConf(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS)
-}
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala
index 93d608dd7668..5a1ab88a432f 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala
@@ -50,9 +50,6 @@ trait PipelineUpdateContext {
     UnionFlowFilter(flowFilterForTables, resetCheckpointFlows)
   }
 
-  /** `PipelineConf` based on the root SparkSession for this update. */
-  def pipelineConf: PipelineConf
-
   /** Buffer containing internal events that are emitted during a run of a 
pipeline. */
   def eventBuffer: PipelineRunEventBuffer
 
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala
index 8192068af67e..30f56d7a8e2c 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala
@@ -20,7 +20,11 @@ package org.apache.spark.sql.pipelines.graph
 import scala.annotation.unused
 
 import org.apache.spark.sql.classic.SparkSession
-import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, 
PipelineEvent, PipelineRunEventBuffer}
+import org.apache.spark.sql.pipelines.logging.{
+  FlowProgressEventLogger,
+  PipelineEvent,
+  PipelineRunEventBuffer
+}
 
 /**
  * An implementation of the PipelineUpdateContext trait used in production.
@@ -29,7 +33,7 @@ import 
org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, Pipeline
  */
 @unused(
   "TODO(SPARK-51727) construct this spark connect server when we expose APIs 
for users " +
-    "to interact with a pipeline"
+  "to interact with a pipeline"
 )
 class PipelineUpdateContextImpl(
     override val unresolvedGraph: DataflowGraph,
@@ -40,8 +44,6 @@ class PipelineUpdateContextImpl(
     throw new IllegalStateException("SparkSession is not available")
   )
 
-  override val pipelineConf: PipelineConf = new PipelineConf(spark)
-
   override val eventBuffer = new PipelineRunEventBuffer(eventCallback)
 
   override val flowProgressEventLogger: FlowProgressEventLogger =
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala
index a0e378f85bce..7116f5fbcf06 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala
@@ -128,7 +128,6 @@ object PipelinesErrors extends Logging {
       val actionFromError = 
GraphExecution.determineFlowExecutionActionFromError(
         ex = ex,
         flowDisplayName = flow.displayName,
-        pipelineConf = env.pipelineConf,
         currentNumTries = prevFailureCount + 1,
         maxAllowedRetries = maxRetries
       )
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala
index fdac2cdb2fc9..503a1aa8e281 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala
@@ -69,8 +69,8 @@ class TriggeredGraphExecution(
 
   /** Back-off strategy used to determine duration between retries. */
   private val backoffStrategy = ExponentialBackoffStrategy(
-    maxTime = (pipelineConf.watchdogMaxRetryTimeInSeconds * 1000).millis,
-    stepSize = (pipelineConf.watchdogMinRetryTimeInSeconds * 1000).millis
+    maxTime = (env.spark.sessionState.conf.watchdogMaxRetryTimeInSeconds * 
1000).millis,
+    stepSize = (env.spark.sessionState.conf.watchdogMinRetryTimeInSeconds * 
1000).millis
   )
 
   override def streamTrigger(flow: Flow): Trigger = {
@@ -128,7 +128,9 @@ class TriggeredGraphExecution(
   }
 
   /** Used to control how many flows are executing at once. */
-  private val concurrencyLimit: Semaphore = new 
Semaphore(pipelineConf.maxConcurrentFlows)
+  private val concurrencyLimit: Semaphore = new Semaphore(
+    env.spark.sessionState.conf.maxConcurrentFlows
+  )
 
   /**
    * Runs the pipeline in a topological order.
@@ -180,11 +182,11 @@ class TriggeredGraphExecution(
       val (runningFlows, availablePermits) = concurrencyLimit.synchronized {
         (flowsWithState(StreamState.RUNNING).size, 
concurrencyLimit.availablePermits)
       }
-      if ((runningFlows + availablePermits) < pipelineConf.maxConcurrentFlows) 
{
+      if ((runningFlows + availablePermits) < 
env.spark.sessionState.conf.maxConcurrentFlows) {
         val errorStr =
-          s"The max concurrency is ${pipelineConf.maxConcurrentFlows}, but 
there are only " +
-          s"$availablePermits permits available with $runningFlows flows 
running. If this " +
-          s"happens consistently, it's possible we're leaking permits."
+          s"The max concurrency is 
${env.spark.sessionState.conf.maxConcurrentFlows}, but " +
+          s"there are only $availablePermits permits available with 
$runningFlows flows running. " +
+          s"If this happens consistently, it's possible we're leaking permits."
         logError(errorStr)
         if (Utils.isTesting) {
           throw new IllegalStateException(errorStr)
@@ -250,7 +252,7 @@ class TriggeredGraphExecution(
       try {
         // Put thread to sleep for the configured polling interval to avoid 
busy-waiting
         // and holding one CPU core.
-        Thread.sleep(pipelineConf.streamStatePollingInterval * 1000)
+        Thread.sleep(env.spark.sessionState.conf.streamStatePollingInterval * 
1000)
       } catch {
         case _: InterruptedException => return
       }
@@ -298,7 +300,6 @@ class TriggeredGraphExecution(
         lastExceptionAction = 
GraphExecution.determineFlowExecutionActionFromError(
           ex = e,
           flowDisplayName = flow.displayName,
-          pipelineConf = pipelineConf,
           currentNumTries = prevFailureCount + 1,
           maxAllowedRetries = maxRetryAttemptsForFlow(flowIdentifier)
         )
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala
index 7aa2f9d63106..96db55f8ebfa 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.pipelines.graph.{
   DataflowGraph,
   FlowFilter,
   NoTables,
-  PipelineConf,
   PipelineUpdateContext,
   TableFilter
 }
@@ -62,7 +61,6 @@ trait TestPipelineUpdateContextMixin {
       refreshTables: TableFilter = AllTables,
       resetCheckpointFlows: FlowFilter = AllFlows
   ) extends PipelineUpdateContext {
-    val pipelineConf: PipelineConf = new PipelineConf(spark)
     val eventBuffer = new PipelineRunEventBuffer(eventCallback = _ => ())
 
     override def flowProgressEventLogger: FlowProgressEventLogger = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to