This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ba38c2c3305 [SPARK-52346][SDP] Fix counter initialization and 
increment logic for flow retries
4ba38c2c3305 is described below

commit 4ba38c2c33055a853c7d39a974c670914dc8a049
Author: Jacky Wang <jacky.w...@databricks.com>
AuthorDate: Mon Sep 1 23:13:28 2025 -0700

    [SPARK-52346][SDP] Fix counter initialization and increment logic for flow 
retries
    
    ### What changes were proposed in this pull request?
    
    Fixes flow failure counter increments by switching to `updateWith`. This 
prevents first increment from throwing `NoSuchElementException` when key does 
not exist.
    
    ### Why are the changes needed?
    
    Current impl `flowToNumConsecutiveFailure(flowIdentifier)` throws exception 
when a flow is retried for first time:
    ```
    java.util.NoSuchElementException: key not found: 
`spark_catalog`.`test_db`.`mv`
            at scala.collection.MapOps.default(Map.scala:289)
            at scala.collection.MapOps.default$(Map.scala:288)
            at scala.collection.AbstractMap.default(Map.scala:420)
            at scala.collection.MapOps.apply(Map.scala:176)
            at scala.collection.MapOps.apply$(Map.scala:175)
            at scala.collection.AbstractMap.apply(Map.scala:420)
            at 
org.apache.spark.sql.pipelines.graph.GraphExecution.incrementFlowToNumConsecutiveFailure(GraphExecution.scala:52)
            at 
org.apache.spark.sql.pipelines.graph.GraphExecution.$anonfun$planAndStartFlow$1(GraphExecution.scala:92)
            at 
org.apache.spark.sql.pipelines.graph.GraphExecution.$anonfun$planAndStartFlow$1$adapted(GraphExecution.scala:90)
            at 
scala.concurrent.impl.Promise$Transformation.run(Promise.scala:484)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
            at java.base/java.lang.Thread.run(Thread.java:840)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Run pipeline manually with flow failure to test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52094 from JiaqiWang18/sdp-graphexecution-incrementflow-map-default.
    
    Authored-by: Jacky Wang <jacky.w...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../spark/sql/pipelines/graph/GraphExecution.scala |  5 +++-
 .../graph/TriggeredGraphExecutionSuite.scala       | 31 +++++++++++++++++++++-
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
index 4c969f1bbefd..c687c7f01ed7 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
@@ -49,7 +49,10 @@ abstract class GraphExecution(
 
   /** Increments flow execution retry count for `flow`. */
   private def incrementFlowToNumConsecutiveFailure(flowIdentifier: 
TableIdentifier): Unit = {
-    flowToNumConsecutiveFailure.put(flowIdentifier, 
flowToNumConsecutiveFailure(flowIdentifier) + 1)
+    flowToNumConsecutiveFailure.updateWith(flowIdentifier) {
+      case Some(count) => Some(count + 1)
+      case None => Some(1)
+    }
   }
 
   /**
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
index 4aaa139378b9..4fcd9dad93fe 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, 
Identifier, TableC
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
 import org.apache.spark.sql.pipelines.common.{FlowStatus, RunState}
 import org.apache.spark.sql.pipelines.graph.TriggeredGraphExecution.StreamState
-import org.apache.spark.sql.pipelines.logging.EventLevel
+import org.apache.spark.sql.pipelines.logging.{EventLevel, FlowProgress}
 import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
@@ -1027,4 +1027,33 @@ class TriggeredGraphExecutionSuite extends ExecutionTest 
with SharedSparkSession
       }
     )
   }
+
+  test("consecutive failure event level is correct") {
+    val session = spark
+    import session.implicits._
+
+    val pipelineDef = new TestGraphRegistrationContext(spark) {
+      registerMaterializedView(
+        "retry_test",
+        partitionCols = Some(Seq("nonexistent_col")),
+        query = dfFlowFunc(spark.range(5).withColumn("id_mod", ($"id" % 
2).cast("int")))
+      )
+    }
+
+    val graph = pipelineDef.toDataflowGraph
+    val updateContext = TestPipelineUpdateContext(spark, graph)
+    updateContext.pipelineExecution.runPipeline()
+    updateContext.pipelineExecution.awaitCompletion()
+
+    val failedEvents = updateContext.eventBuffer.getEvents.filter { e =>
+      e.details.isInstanceOf[FlowProgress] &&
+      e.details.asInstanceOf[FlowProgress].status == FlowStatus.FAILED
+    }
+
+    val warnCount = failedEvents.count(_.level == EventLevel.WARN)
+    // flowToNumConsecutiveFailure controls that the last failure should be 
logged as ERROR
+    val errorCount = failedEvents.count(_.level == EventLevel.ERROR)
+
+    assert(warnCount == 2 && errorCount == 1)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to