This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cecf758b4323 [SPARK-55119][SQL] Fix Continue Handler: prevent 
INTERNAL_ERROR and incorrect conditional statements interruption
cecf758b4323 is described below

commit cecf758b4323417a894c37641d760c723b530015
Author: Milan Dankovic <[email protected]>
AuthorDate: Wed Jan 28 20:41:06 2026 +0800

    [SPARK-55119][SQL] Fix Continue Handler: prevent INTERNAL_ERROR and 
incorrect conditional statements interruption
    
    ### What changes were proposed in this pull request?
    
    This PR fixes two issues with `CONTINUE` handler behavior in SQL scripting 
when exceptions occur in or around conditional statements:
    
    **Issue 1: INTERNAL_ERROR when CONTINUE handler interrupts conditional 
statement at end of loop body**
    - When a CONTINUE handler processes an exception in a conditional 
statement's condition (e.g., `IF (1/0)==1 THEN`), the conditional is interrupted
    - If the conditional is the last statement in a loop body (WHILE/REPEAT), 
the loop would call `next()` on an exhausted iterator after the handler 
completes
    - This caused: `INTERNAL_ERROR: No more elements to iterate through in the 
current SQL compound statement`
    
    **Fix:** Added `hasNext` checks in WHILE and REPEAT loop iterators before 
calling `next()` on the body. If the body is exhausted, return 
`NoOpStatementExec` and transition back to the condition state instead of 
calling `next()`.
    
    **Issue 2: Incorrect interruption of conditional statements when exception 
occurs before them**
    - When an exception occurs **before** a conditional statement (e.g., 
`SIGNAL SQLSTATE '02000'; WHILE ... DO`), the conditional was incorrectly 
interrupted
    - This caused the loop statement body to be skipped even though the 
exception didn't occur during its evaluation
    - The interrupt logic couldn't distinguish between "exception during 
evaluation" vs "exception before reaching the statement"
    
    **Fix:** Added explicit tracking of when evaluation starts:
    - `ForStatementExec`: Added `hasStartedQueryEvaluation` flag, set when 
`cachedQueryResult()` begins evaluating the FOR loop's query
    - `SimpleCaseStatementExec`: Added `hasStartedCaseVariableEvaluation` flag, 
set when `validateCache()` begins evaluating the case variable
    - Updated `interruptConditionalStatements()` to check these flags - only 
interrupt if evaluation was actually attempted
    
    ### Why are the changes needed?
    Cursor support is dependent on `CONTINUE HANDLER` correct behavior. We 
needed to fix this to unblock development of cursors support.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added comprehensive test coverage in `SqlScriptingExecutionSuite` that 
tests both **Issue 1** and **Issue 2**.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #53894 from miland-db/milan-dankovic_data/double-advancement-bug.
    
    Authored-by: Milan Dankovic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/scripting/SqlScriptingExecution.scala      |  42 ++-
 .../sql/scripting/SqlScriptingExecutionNode.scala  |  41 +++
 .../sql/scripting/SqlScriptingExecutionSuite.scala | 338 +++++++++++++++++++++
 3 files changed, 415 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
index c8f7172e59bd..1dee2ae80f50 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
@@ -88,7 +88,8 @@ class SqlScriptingExecution(
 
   /**
    * Helper method to execute interrupts to ConditionalStatements.
-   * This method should only interrupt when the statement that throws is a 
conditional statement.
+   * This method should only interrupt when the exception was thrown during 
evaluation of
+   * the conditional statement's condition.
    * @param executionPlan Execution plan.
    */
   private def interruptConditionalStatements(executionPlan: 
NonLeafStatementExec): Unit = {
@@ -101,11 +102,40 @@ class SqlScriptingExecution(
     }
 
     currExecPlan match {
-      case exec: ConditionalStatementExec =>
-        // Only interrupt if the conditional statement is currently evaluating 
its condition.
-        // For loop statements, this means we should skip the loop when an 
exception occurs
-        // during condition evaluation, but NOT when an exception occurs in 
the loop body.
-        if (exec.isInCondition) {
+      case exec: ConditionalStatementExec if exec.isInCondition =>
+        // Only interrupt the conditional if its condition/query was being 
evaluated when the
+        // exception occurred. This distinguishes between two scenarios:
+        //   1. Exception during condition evaluation -> interrupt (skip the 
conditional)
+        //   2. Exception before reaching the conditional -> don't interrupt 
(execute normally)
+        //
+        // Different conditional statements track evaluation state differently:
+        //   - SimpleCaseStatementExec: hasStartedCaseVariableEvaluation flag 
is set when
+        //     validateCache() begins evaluating the case variable expression.
+        //   - ForStatementExec: hasStartedQueryEvaluation flag is set when 
cachedQueryResult()
+        //     begins evaluating the FOR loop's query.
+        //   - IF/ELSEIF, WHILE, REPEAT, SEARCHED CASE: curr.isExecuted flag 
is set by
+        //     evaluateBooleanCondition() before evaluating each condition.
+        val shouldInterrupt =
+          exec match {
+            case simpleCaseStmt: SimpleCaseStatementExec
+              if simpleCaseStmt.hasStartedCaseVariableEvaluation =>
+              // Only interrupt if case variable evaluation was attempted.
+              true
+            case forStmt: ForStatementExec =>
+              // Only interrupt if query evaluation was attempted.
+              forStmt.hasStartedQueryEvaluation
+            case _ =>
+              // For IF, WHILE, REPEAT, SEARCHED/SIMPLE CASE: check if 
condition was executed.
+              // evaluateBooleanCondition sets isExecuted=true before 
evaluation, so if an
+              // exception occurs during evaluation, isExecuted will be true. 
If the exception
+              // happened before reaching the conditional, isExecuted will 
still be false.
+              exec.curr match {
+                case Some(stmt: SingleStatementExec) => stmt.isExecuted
+                case _ => false
+              }
+          }
+
+        if (shouldInterrupt) {
           exec.interrupted = true
         }
       case _ =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
index 155db8a68353..7a2bd26f18c9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
@@ -556,6 +556,18 @@ class WhileStatementExec(
                 throw SparkException.internalError("Unexpected statement type 
in WHILE condition.")
             }
           case WhileState.Body =>
+            // Check if body has more statements before calling next(). When 
an exception in a
+            // conditional statement's condition is handled by a CONTINUE 
handler, the conditional
+            // is interrupted. If it's the last statement in the loop body, 
calling next() on the
+            // exhausted iterator would fail. Instead, we return 
NoOpStatementExec and transition
+            // back to the condition.
+            if (!body.getTreeIterator.hasNext) {
+              state = WhileState.Condition
+              curr = Some(condition)
+              condition.reset()
+              return new NoOpStatementExec
+            }
+
             val retStmt = body.getTreeIterator.next()
 
             // Handle LEAVE or ITERATE statement if it has been encountered.
@@ -718,15 +730,23 @@ class SimpleCaseStatementExec(
   private var conditionBodyTupleIterator: Iterator[(SingleStatementExec, 
CompoundBodyExec)] = _
   private var caseVariableLiteral: Literal = _
 
+  // Flag to track if case variable evaluation has been attempted. Used by 
CONTINUE handler
+  // mechanism to determine if an exception occurred during case variable 
evaluation vs. before
+  // the CASE statement was reached.
+  protected[scripting] var hasStartedCaseVariableEvaluation: Boolean = false
+
   private var isCacheValid = false
   private def validateCache(): Unit = {
     if (!isCacheValid) {
+      // Set flags before evaluation so CONTINUE handler can detect if 
exception happened here.
+      hasStartedCaseVariableEvaluation = true
       val values = caseVariableExec.buildDataFrame(session).collect()
       caseVariableExec.isExecuted = true
 
       caseVariableLiteral = Literal(values.head.get(0))
       conditionBodyTupleIterator = createConditionBodyIterator
       isCacheValid = true
+      hasStartedCaseVariableEvaluation = false
     }
   }
 
@@ -828,6 +848,7 @@ class SimpleCaseStatementExec(
     caseVariableExec.reset()
     conditionalBodies.foreach(b => b.reset())
     elseBody.foreach(b => b.reset())
+    hasStartedCaseVariableEvaluation = false
   }
 }
 
@@ -878,6 +899,18 @@ class RepeatStatementExec(
               throw SparkException.internalError("Unexpected statement type in 
REPEAT condition.")
           }
         case RepeatState.Body =>
+          // Check if body has more statements before calling next(). When an 
exception in a
+          // conditional statement's condition is handled by a CONTINUE 
handler, the conditional
+          // is interrupted. If it's the last statement in the loop body, 
calling next() on the
+          // exhausted iterator would fail. Instead, we return 
NoOpStatementExec and transition
+          // back to the condition.
+          if (!body.getTreeIterator.hasNext) {
+            state = RepeatState.Condition
+            curr = Some(condition)
+            condition.reset()
+            return new NoOpStatementExec
+          }
+
           val retStmt = body.getTreeIterator.next()
 
           retStmt match {
@@ -1045,17 +1078,24 @@ class ForStatementExec(
   }
   private var state = ForState.VariableAssignment
 
+  // Flag to track if FOR query evaluation has been attempted. Used by 
CONTINUE handler
+  // mechanism to determine if an exception occurred during query evaluation 
vs. before
+  // the FOR statement was reached.
+  protected[scripting] var hasStartedQueryEvaluation = false
   private var queryResult: util.Iterator[Row] = _
   private var queryColumnNameToDataType: Map[String, DataType] = _
   private var isResultCacheValid = false
   private def cachedQueryResult(): util.Iterator[Row] = {
     if (!isResultCacheValid) {
+      // Set flag before evaluation so CONTINUE handler can detect if 
exception happened here.
+      hasStartedQueryEvaluation = true
       val df = query.buildDataFrame(session)
       queryResult = df.toLocalIterator()
       queryColumnNameToDataType = df.schema.fields.map(f => f.name -> 
f.dataType).toMap
 
       query.isExecuted = true
       isResultCacheValid = true
+      hasStartedQueryEvaluation = false
     }
     queryResult
   }
@@ -1253,6 +1293,7 @@ class ForStatementExec(
     curr = None
     bodyWithVariables = None
     firstIteration = true
+    hasStartedQueryEvaluation = false
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
index db1dcc7dc945..1992d02300a2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
@@ -402,6 +402,310 @@ class SqlScriptingExecutionSuite extends QueryTest with 
SharedSparkSession {
     verifySqlScriptResult(sqlScript, expected = expected)
   }
 
+  test("continue handler - Exception in FOR loop query evaluation") {
+    val sqlScript =
+      """
+        |BEGIN
+        |  DECLARE i = 0;
+        |  DECLARE handled = 0;
+        |  DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+        |    SET handled = handled + 1;
+        |
+        |  WHILE i < 2 DO
+        |    SET i = i + 1;
+        |    FOR o AS SELECT 1/0 DO
+        |      SELECT 1;
+        |    END FOR;
+        |  END WHILE;
+        |
+        |  SELECT handled;
+        |END
+        |""".stripMargin
+    val expected = Seq(Seq(Row(2)))
+    verifySqlScriptResult(sqlScript, expected = expected)
+  }
+
+  test("continue handler - WHILE loop with exception in IF condition") {
+    val sqlScript =
+      """
+        |BEGIN
+        |  DECLARE i INT DEFAULT 0;
+        |  DECLARE s INT DEFAULT 0;
+        |
+        |  DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+        |    SELECT 22;
+        |
+        |  WHILE i < 2 DO
+        |    SET i = i + 1;
+        |
+        |    IF (1/0)==1 THEN
+        |      SET s = s + 5;
+        |    END IF;
+        |  END WHILE;
+        |
+        |  SELECT i, s;
+        |END
+        |""".stripMargin
+    val expected = Seq(
+      Seq(Row(22)),  // handler executed on first iteration
+      Seq(Row(22)),  // handler executed on second iteration
+      Seq(Row(2, 0)) // final select: i=2, s=0 (never incremented)
+    )
+    verifySqlScriptResult(sqlScript, expected = expected)
+  }
+
+  test("continue handler - REPEAT loop with exception in IF condition") {
+    val sqlScript =
+      """
+        |BEGIN
+        |  DECLARE i INT DEFAULT 0;
+        |  DECLARE s INT DEFAULT 0;
+        |
+        |  DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+        |    SELECT 22;
+        |
+        |  REPEAT
+        |    SET i = i + 1;
+        |
+        |    IF (1/0)==1 THEN
+        |      SET s = s + 5;
+        |    END IF;
+        |  UNTIL i >= 2
+        |  END REPEAT;
+        |
+        |  SELECT i, s;
+        |END
+        |""".stripMargin
+    val expected = Seq(
+      Seq(Row(22)),  // handler executed on first iteration
+      Seq(Row(22)),  // handler executed on second iteration
+      Seq(Row(2, 0)) // final select: i=2, s=0 (never incremented)
+    )
+    verifySqlScriptResult(sqlScript, expected = expected)
+  }
+
+  test("continue handler - WHILE loop with zero division before IF") {
+    val sqlScript =
+      """
+        |BEGIN
+        |  DECLARE i INT DEFAULT 0;
+        |  DECLARE s INT DEFAULT 0;
+        |
+        |  DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+        |  BEGIN
+        |    SELECT 22;
+        |  END;
+        |
+        |  WHILE i < 2 DO
+        |    SET i = i + 1;
+        |
+        |    SELECT 1/0;
+        |
+        |    IF 1==1 THEN
+        |      SET s = s + 5;
+        |    END IF;
+        |  END WHILE;
+        |
+        |  SELECT i, s;
+        |END
+        |""".stripMargin
+    val expected = Seq(
+      Seq(Row(22)),   // handler executed on first iteration
+      Seq(Row(22)),   // handler executed on second iteration
+      Seq(Row(2, 10)) // final select: i=2, s=10 (incremented twice)
+    )
+    verifySqlScriptResult(sqlScript, expected = expected)
+  }
+
+  test("continue handler - REPEAT loop with zero division before IF") {
+    val sqlScript =
+      """
+        |BEGIN
+        |  DECLARE i INT DEFAULT 0;
+        |  DECLARE s INT DEFAULT 0;
+        |
+        |  DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+        |  BEGIN
+        |    SELECT 22;
+        |  END;
+        |
+        |  REPEAT
+        |    SET i = i + 1;
+        |
+        |    SELECT 1/0;
+        |
+        |    IF 1==1 THEN
+        |      SET s = s + 5;
+        |    END IF;
+        |  UNTIL i >= 2
+        |  END REPEAT;
+        |
+        |  SELECT i, s;
+        |END
+        |""".stripMargin
+    val expected = Seq(
+      Seq(Row(22)),   // handler executed on first iteration
+      Seq(Row(22)),   // handler executed on second iteration
+      Seq(Row(2, 10)) // final select: i=2, s=10 (incremented twice)
+    )
+    verifySqlScriptResult(sqlScript, expected = expected)
+  }
+
+  test("continue handler - FOR loop with exception in IF condition") {
+    val sqlScript =
+      """
+        |BEGIN
+        |  DECLARE s INT DEFAULT 0;
+        |
+        |  DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+        |    SELECT 22;
+        |
+        |  FOR i AS VALUES (1), (2) DO
+        |    SELECT 1;
+        |    IF (1/0)==1 THEN
+        |      SET s = s + 5;
+        |    END IF;
+        |  END FOR;
+        |
+        |  SELECT s;
+        |END
+        |""".stripMargin
+    val expected = Seq(
+      Seq(Row(1)),   // SELECT 1 on first iteration
+      Seq(Row(22)),  // handler executed on first iteration
+      Seq(Row(1)),   // SELECT 1 on second iteration
+      Seq(Row(22)),  // handler executed on second iteration
+      Seq(Row(0))    // final select: i=2, s=0 (never incremented)
+    )
+    verifySqlScriptResult(sqlScript, expected = expected)
+  }
+
+  test("continue handler - FOR loop with zero division before IF") {
+    val sqlScript =
+      """
+        |BEGIN
+        |  DECLARE s INT DEFAULT 0;
+        |
+        |  DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+        |  BEGIN
+        |    SELECT 22;
+        |  END;
+        |
+        |  FOR i AS VALUES (1), (2) DO
+        |    SELECT 1;
+        |    SELECT 1/0;
+        |
+        |    IF 1==1 THEN
+        |      SET s = s + 5;
+        |    END IF;
+        |  END FOR;
+        |
+        |  SELECT 10;
+        |END
+        |""".stripMargin
+    val expected = Seq(
+      Seq(Row(1)),   // SELECT 1 on first iteration
+      Seq(Row(22)),  // handler executed on first iteration
+      Seq(Row(1)),   // SELECT 1 on second iteration
+      Seq(Row(22)),  // handler executed on second iteration
+      Seq(Row(10))   // final select: s=10 (incremented twice)
+    )
+    verifySqlScriptResult(sqlScript, expected = expected)
+  }
+
+  test("continue handler - zero division before FOR loop") {
+    val sqlScript =
+      """
+        |BEGIN
+        |  DECLARE s INT DEFAULT 0;
+        |
+        |  DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+        |  BEGIN
+        |    SELECT 22;
+        |  END;
+        |
+        |  SELECT 1/0;
+        |
+        |  FOR i AS VALUES (1), (2) DO
+        |    SELECT 1;
+        |    IF 1==1 THEN
+        |      SET s = s + 5;
+        |    END IF;
+        |  END FOR;
+        |
+        |  SELECT s;
+        |END
+        |""".stripMargin
+    val expected = Seq(
+      Seq(Row(22)),  // handler executed
+      Seq(Row(1)),   // SELECT 1 on first iteration
+      Seq(Row(1)),   // SELECT 1 on second iteration
+      Seq(Row(10))   // final select: s=10 (incremented twice)
+    )
+    verifySqlScriptResult(sqlScript, expected = expected)
+  }
+
+  test("continue handler - zero division before simple case") {
+    withTable("t") {
+      val commands =
+        """
+          |BEGIN
+          |  DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+          |  BEGIN
+          |    SELECT 22;
+          |  END;
+          |
+          |  CREATE TABLE t (a INT, b STRING, c DOUBLE) USING parquet;
+          |  INSERT INTO t VALUES (1, 'a', 1.0);
+          |  INSERT INTO t VALUES (2, 'b', 2.0);
+          |
+          |  SELECT 1/0;
+          |  CASE (SELECT COUNT(*) FROM t)
+          |   WHEN 1 THEN
+          |     SELECT 42;
+          |   WHEN 3 THEN
+          |     SELECT 43;
+          |   ELSE
+          |     SELECT 44;
+          |  END CASE;
+          |END
+          |""".stripMargin
+      val expected = Seq(
+        Seq(Row(22)),  // handler executed
+        Seq(Row(44))
+      )
+      verifySqlScriptResult(commands, expected)
+    }
+  }
+
+  test("continue handler - zero division before searched case") {
+    val commands =
+      """
+        |BEGIN
+        |  DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+        |  BEGIN
+        |    SELECT 22;
+        |  END;
+        |
+        |  SELECT 1/0;
+        |
+        |  CASE
+        |    WHEN 1 = (SELECT 2) THEN
+        |      SELECT 1;
+        |    WHEN 2 = 2 THEN
+        |      SELECT 42;
+        |    WHEN (SELECT * FROM t) THEN
+        |      SELECT * FROM b;
+        |  END CASE;
+        |END
+        |""".stripMargin
+    val expected = Seq(
+      Seq(Row(22)),  // handler executed
+      Seq(Row(42))
+    )
+    verifySqlScriptResult(commands, expected)
+  }
+
   test("handler - exit resolve in outer block") {
     val sqlScript =
       """
@@ -972,6 +1276,40 @@ class SqlScriptingExecutionSuite extends QueryTest with 
SharedSparkSession {
     verifySqlScriptResult(sqlScript, expected = expected)
   }
 
+  test("continue handler - continue when exception happens in simple case 
body") {
+    val sqlScript =
+      """
+        |BEGIN
+        |  DECLARE VARIABLE flag INT = -1;
+        |  DECLARE VARIABLE x INT = 1;
+        |  DECLARE CONTINUE HANDLER FOR SQLSTATE '22012'
+        |  BEGIN
+        |    SELECT 22;
+        |    SET flag = 1;
+        |  END;
+        |
+        |  CASE x
+        |    WHEN flag THEN
+        |      SELECT 10;
+        |    WHEN 1 THEN
+        |      SELECT 11;
+        |      SELECT 1/0;
+        |      SELECT 33;
+        |    ELSE
+        |      SELECT 12;
+        |  END CASE;
+        |  SELECT flag;
+        |END
+        |""".stripMargin
+    val expected = Seq(
+      Seq(Row(11)),
+      Seq(Row(22)),
+      Seq(Row(33)),
+      Seq(Row(1))
+    )
+    verifySqlScriptResult(sqlScript, expected = expected)
+  }
+
   test("exit handler - exit resolve when simple case condition computation 
fails") {
     val sqlScript =
       """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to