This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4ba38c2c3305 [SPARK-52346][SDP] Fix counter initialization and increment logic for flow retries 4ba38c2c3305 is described below commit 4ba38c2c33055a853c7d39a974c670914dc8a049 Author: Jacky Wang <jacky.w...@databricks.com> AuthorDate: Mon Sep 1 23:13:28 2025 -0700 [SPARK-52346][SDP] Fix counter initialization and increment logic for flow retries ### What changes were proposed in this pull request? Fixes flow failure counter increments by switching to `updateWith`. This prevents first increment from throwing `NoSuchElementException` when key does not exist. ### Why are the changes needed? Current impl `flowToNumConsecutiveFailure(flowIdentifier)` throws exception when a flow is retried for first time: ``` java.util.NoSuchElementException: key not found: `spark_catalog`.`test_db`.`mv` at scala.collection.MapOps.default(Map.scala:289) at scala.collection.MapOps.default$(Map.scala:288) at scala.collection.AbstractMap.default(Map.scala:420) at scala.collection.MapOps.apply(Map.scala:176) at scala.collection.MapOps.apply$(Map.scala:175) at scala.collection.AbstractMap.apply(Map.scala:420) at org.apache.spark.sql.pipelines.graph.GraphExecution.incrementFlowToNumConsecutiveFailure(GraphExecution.scala:52) at org.apache.spark.sql.pipelines.graph.GraphExecution.$anonfun$planAndStartFlow$1(GraphExecution.scala:92) at org.apache.spark.sql.pipelines.graph.GraphExecution.$anonfun$planAndStartFlow$1$adapted(GraphExecution.scala:90) at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:484) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Run pipeline manually with flow failure to test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52094 from JiaqiWang18/sdp-graphexecution-incrementflow-map-default. Authored-by: Jacky Wang <jacky.w...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../spark/sql/pipelines/graph/GraphExecution.scala | 5 +++- .../graph/TriggeredGraphExecutionSuite.scala | 31 +++++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala index 4c969f1bbefd..c687c7f01ed7 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala @@ -49,7 +49,10 @@ abstract class GraphExecution( /** Increments flow execution retry count for `flow`. */ private def incrementFlowToNumConsecutiveFailure(flowIdentifier: TableIdentifier): Unit = { - flowToNumConsecutiveFailure.put(flowIdentifier, flowToNumConsecutiveFailure(flowIdentifier) + 1) + flowToNumConsecutiveFailure.updateWith(flowIdentifier) { + case Some(count) => Some(count + 1) + case None => Some(1) + } } /** diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala index 4aaa139378b9..4fcd9dad93fe 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableC import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.pipelines.common.{FlowStatus, RunState} import org.apache.spark.sql.pipelines.graph.TriggeredGraphExecution.StreamState -import org.apache.spark.sql.pipelines.logging.EventLevel +import org.apache.spark.sql.pipelines.logging.{EventLevel, FlowProgress} import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructType} @@ -1027,4 +1027,33 @@ class TriggeredGraphExecutionSuite extends ExecutionTest with SharedSparkSession } ) } + + test("consecutive failure event level is correct") { + val session = spark + import session.implicits._ + + val pipelineDef = new TestGraphRegistrationContext(spark) { + registerMaterializedView( + "retry_test", + partitionCols = Some(Seq("nonexistent_col")), + query = dfFlowFunc(spark.range(5).withColumn("id_mod", ($"id" % 2).cast("int"))) + ) + } + + val graph = pipelineDef.toDataflowGraph + val updateContext = TestPipelineUpdateContext(spark, graph) + updateContext.pipelineExecution.runPipeline() + updateContext.pipelineExecution.awaitCompletion() + + val failedEvents = updateContext.eventBuffer.getEvents.filter { e => + e.details.isInstanceOf[FlowProgress] && + e.details.asInstanceOf[FlowProgress].status == FlowStatus.FAILED + } + + val warnCount = failedEvents.count(_.level == EventLevel.WARN) + // flowToNumConsecutiveFailure controls that the last failure should be logged as ERROR + val errorCount = failedEvents.count(_.level == EventLevel.ERROR) + + assert(warnCount == 2 && errorCount == 1) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org