This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new a2fa28dcd355 [SPARK-55119][SQL] Fix Continue Handler: prevent
INTERNAL_ERROR and incorrect conditional statements interruption
a2fa28dcd355 is described below
commit a2fa28dcd35554b0fc28992944b560e27fb9c572
Author: Milan Dankovic <[email protected]>
AuthorDate: Wed Jan 28 20:41:06 2026 +0800
[SPARK-55119][SQL] Fix Continue Handler: prevent INTERNAL_ERROR and
incorrect conditional statements interruption
### What changes were proposed in this pull request?
This PR fixes two issues with `CONTINUE` handler behavior in SQL scripting
when exceptions occur in or around conditional statements:
**Issue 1: INTERNAL_ERROR when CONTINUE handler interrupts conditional
statement at end of loop body**
- When a CONTINUE handler processes an exception in a conditional
statement's condition (e.g., `IF (1/0)==1 THEN`), the conditional is interrupted
- If the conditional is the last statement in a loop body (WHILE/REPEAT),
the loop would call `next()` on an exhausted iterator after the handler
completes
- This caused: `INTERNAL_ERROR: No more elements to iterate through in the
current SQL compound statement`
**Fix:** Added `hasNext` checks in WHILE and REPEAT loop iterators before
calling `next()` on the body. If the body is exhausted, return
`NoOpStatementExec` and transition back to the condition state instead of
calling `next()`.
**Issue 2: Incorrect interruption of conditional statements when exception
occurs before them**
- When an exception occurs **before** a conditional statement (e.g.,
`SIGNAL SQLSTATE '02000'; WHILE ... DO`), the conditional was incorrectly
interrupted
- This caused the loop statement body to be skipped even though the
exception didn't occur during its evaluation
- The interrupt logic couldn't distinguish between "exception during
evaluation" vs "exception before reaching the statement"
**Fix:** Added explicit tracking of when evaluation starts:
- `ForStatementExec`: Added `hasStartedQueryEvaluation` flag, set when
`cachedQueryResult()` begins evaluating the FOR loop's query
- `SimpleCaseStatementExec`: Added `hasStartedCaseVariableEvaluation` flag,
set when `validateCache()` begins evaluating the case variable
- Updated `interruptConditionalStatements()` to check these flags - only
interrupt if evaluation was actually attempted
### Why are the changes needed?
Cursor support is dependent on `CONTINUE HANDLER` correct behavior. We
needed to fix this to unblock development of cursors support.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added comprehensive test coverage in `SqlScriptingExecutionSuite` that
tests both **Issue 1** and **Issue 2**.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53894 from miland-db/milan-dankovic_data/double-advancement-bug.
Authored-by: Milan Dankovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit cecf758b4323417a894c37641d760c723b530015)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/scripting/SqlScriptingExecution.scala | 42 ++-
.../sql/scripting/SqlScriptingExecutionNode.scala | 41 +++
.../sql/scripting/SqlScriptingExecutionSuite.scala | 338 +++++++++++++++++++++
3 files changed, 415 insertions(+), 6 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
index c8f7172e59bd..1dee2ae80f50 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
@@ -88,7 +88,8 @@ class SqlScriptingExecution(
/**
* Helper method to execute interrupts to ConditionalStatements.
- * This method should only interrupt when the statement that throws is a
conditional statement.
+ * This method should only interrupt when the exception was thrown during
evaluation of
+ * the conditional statement's condition.
* @param executionPlan Execution plan.
*/
private def interruptConditionalStatements(executionPlan:
NonLeafStatementExec): Unit = {
@@ -101,11 +102,40 @@ class SqlScriptingExecution(
}
currExecPlan match {
- case exec: ConditionalStatementExec =>
- // Only interrupt if the conditional statement is currently evaluating
its condition.
- // For loop statements, this means we should skip the loop when an
exception occurs
- // during condition evaluation, but NOT when an exception occurs in
the loop body.
- if (exec.isInCondition) {
+ case exec: ConditionalStatementExec if exec.isInCondition =>
+ // Only interrupt the conditional if its condition/query was being
evaluated when the
+ // exception occurred. This distinguishes between two scenarios:
+ // 1. Exception during condition evaluation -> interrupt (skip the
conditional)
+ // 2. Exception before reaching the conditional -> don't interrupt
(execute normally)
+ //
+ // Different conditional statements track evaluation state differently:
+ // - SimpleCaseStatementExec: hasStartedCaseVariableEvaluation flag
is set when
+ // validateCache() begins evaluating the case variable expression.
+ // - ForStatementExec: hasStartedQueryEvaluation flag is set when
cachedQueryResult()
+ // begins evaluating the FOR loop's query.
+ // - IF/ELSEIF, WHILE, REPEAT, SEARCHED CASE: curr.isExecuted flag
is set by
+ // evaluateBooleanCondition() before evaluating each condition.
+ val shouldInterrupt =
+ exec match {
+ case simpleCaseStmt: SimpleCaseStatementExec
+ if simpleCaseStmt.hasStartedCaseVariableEvaluation =>
+ // Only interrupt if case variable evaluation was attempted.
+ true
+ case forStmt: ForStatementExec =>
+ // Only interrupt if query evaluation was attempted.
+ forStmt.hasStartedQueryEvaluation
+ case _ =>
+ // For IF, WHILE, REPEAT, SEARCHED/SIMPLE CASE: check if
condition was executed.
+ // evaluateBooleanCondition sets isExecuted=true before
evaluation, so if an
+ // exception occurs during evaluation, isExecuted will be true.
If the exception
+ // happened before reaching the conditional, isExecuted will
still be false.
+ exec.curr match {
+ case Some(stmt: SingleStatementExec) => stmt.isExecuted
+ case _ => false
+ }
+ }
+
+ if (shouldInterrupt) {
exec.interrupted = true
}
case _ =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
index c47df4b7a89e..953301ab8ed5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
@@ -554,6 +554,18 @@ class WhileStatementExec(
throw SparkException.internalError("Unexpected statement type
in WHILE condition.")
}
case WhileState.Body =>
+ // Check if body has more statements before calling next(). When
an exception in a
+ // conditional statement's condition is handled by a CONTINUE
handler, the conditional
+ // is interrupted. If it's the last statement in the loop body,
calling next() on the
+ // exhausted iterator would fail. Instead, we return
NoOpStatementExec and transition
+ // back to the condition.
+ if (!body.getTreeIterator.hasNext) {
+ state = WhileState.Condition
+ curr = Some(condition)
+ condition.reset()
+ return new NoOpStatementExec
+ }
+
val retStmt = body.getTreeIterator.next()
// Handle LEAVE or ITERATE statement if it has been encountered.
@@ -716,15 +728,23 @@ class SimpleCaseStatementExec(
private var conditionBodyTupleIterator: Iterator[(SingleStatementExec,
CompoundBodyExec)] = _
private var caseVariableLiteral: Literal = _
+ // Flag to track if case variable evaluation has been attempted. Used by
CONTINUE handler
+ // mechanism to determine if an exception occurred during case variable
evaluation vs. before
+ // the CASE statement was reached.
+ protected[scripting] var hasStartedCaseVariableEvaluation: Boolean = false
+
private var isCacheValid = false
private def validateCache(): Unit = {
if (!isCacheValid) {
+ // Set flags before evaluation so CONTINUE handler can detect if
exception happened here.
+ hasStartedCaseVariableEvaluation = true
val values = caseVariableExec.buildDataFrame(session).collect()
caseVariableExec.isExecuted = true
caseVariableLiteral = Literal(values.head.get(0))
conditionBodyTupleIterator = createConditionBodyIterator
isCacheValid = true
+ hasStartedCaseVariableEvaluation = false
}
}
@@ -826,6 +846,7 @@ class SimpleCaseStatementExec(
caseVariableExec.reset()
conditionalBodies.foreach(b => b.reset())
elseBody.foreach(b => b.reset())
+ hasStartedCaseVariableEvaluation = false
}
}
@@ -876,6 +897,18 @@ class RepeatStatementExec(
throw SparkException.internalError("Unexpected statement type in
REPEAT condition.")
}
case RepeatState.Body =>
+ // Check if body has more statements before calling next(). When an
exception in a
+ // conditional statement's condition is handled by a CONTINUE
handler, the conditional
+ // is interrupted. If it's the last statement in the loop body,
calling next() on the
+ // exhausted iterator would fail. Instead, we return
NoOpStatementExec and transition
+ // back to the condition.
+ if (!body.getTreeIterator.hasNext) {
+ state = RepeatState.Condition
+ curr = Some(condition)
+ condition.reset()
+ return new NoOpStatementExec
+ }
+
val retStmt = body.getTreeIterator.next()
retStmt match {
@@ -1043,17 +1076,24 @@ class ForStatementExec(
}
private var state = ForState.VariableAssignment
+ // Flag to track if FOR query evaluation has been attempted. Used by
CONTINUE handler
+ // mechanism to determine if an exception occurred during query evaluation
vs. before
+ // the FOR statement was reached.
+ protected[scripting] var hasStartedQueryEvaluation = false
private var queryResult: util.Iterator[Row] = _
private var queryColumnNameToDataType: Map[String, DataType] = _
private var isResultCacheValid = false
private def cachedQueryResult(): util.Iterator[Row] = {
if (!isResultCacheValid) {
+ // Set flag before evaluation so CONTINUE handler can detect if
exception happened here.
+ hasStartedQueryEvaluation = true
val df = query.buildDataFrame(session)
queryResult = df.toLocalIterator()
queryColumnNameToDataType = df.schema.fields.map(f => f.name ->
f.dataType).toMap
query.isExecuted = true
isResultCacheValid = true
+ hasStartedQueryEvaluation = false
}
queryResult
}
@@ -1250,6 +1290,7 @@ class ForStatementExec(
curr = None
bodyWithVariables = None
firstIteration = true
+ hasStartedQueryEvaluation = false
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
index 2ad715f671ed..5ed84db96836 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
@@ -403,6 +403,310 @@ class SqlScriptingExecutionSuite extends QueryTest with
SharedSparkSession {
verifySqlScriptResult(sqlScript, expected = expected)
}
+ test("continue handler - Exception in FOR loop query evaluation") {
+ val sqlScript =
+ """
+ |BEGIN
+ | DECLARE i = 0;
+ | DECLARE handled = 0;
+ | DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+ | SET handled = handled + 1;
+ |
+ | WHILE i < 2 DO
+ | SET i = i + 1;
+ | FOR o AS SELECT 1/0 DO
+ | SELECT 1;
+ | END FOR;
+ | END WHILE;
+ |
+ | SELECT handled;
+ |END
+ |""".stripMargin
+ val expected = Seq(Seq(Row(2)))
+ verifySqlScriptResult(sqlScript, expected = expected)
+ }
+
+ test("continue handler - WHILE loop with exception in IF condition") {
+ val sqlScript =
+ """
+ |BEGIN
+ | DECLARE i INT DEFAULT 0;
+ | DECLARE s INT DEFAULT 0;
+ |
+ | DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+ | SELECT 22;
+ |
+ | WHILE i < 2 DO
+ | SET i = i + 1;
+ |
+ | IF (1/0)==1 THEN
+ | SET s = s + 5;
+ | END IF;
+ | END WHILE;
+ |
+ | SELECT i, s;
+ |END
+ |""".stripMargin
+ val expected = Seq(
+ Seq(Row(22)), // handler executed on first iteration
+ Seq(Row(22)), // handler executed on second iteration
+ Seq(Row(2, 0)) // final select: i=2, s=0 (never incremented)
+ )
+ verifySqlScriptResult(sqlScript, expected = expected)
+ }
+
+ test("continue handler - REPEAT loop with exception in IF condition") {
+ val sqlScript =
+ """
+ |BEGIN
+ | DECLARE i INT DEFAULT 0;
+ | DECLARE s INT DEFAULT 0;
+ |
+ | DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+ | SELECT 22;
+ |
+ | REPEAT
+ | SET i = i + 1;
+ |
+ | IF (1/0)==1 THEN
+ | SET s = s + 5;
+ | END IF;
+ | UNTIL i >= 2
+ | END REPEAT;
+ |
+ | SELECT i, s;
+ |END
+ |""".stripMargin
+ val expected = Seq(
+ Seq(Row(22)), // handler executed on first iteration
+ Seq(Row(22)), // handler executed on second iteration
+ Seq(Row(2, 0)) // final select: i=2, s=0 (never incremented)
+ )
+ verifySqlScriptResult(sqlScript, expected = expected)
+ }
+
+ test("continue handler - WHILE loop with zero division before IF") {
+ val sqlScript =
+ """
+ |BEGIN
+ | DECLARE i INT DEFAULT 0;
+ | DECLARE s INT DEFAULT 0;
+ |
+ | DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+ | BEGIN
+ | SELECT 22;
+ | END;
+ |
+ | WHILE i < 2 DO
+ | SET i = i + 1;
+ |
+ | SELECT 1/0;
+ |
+ | IF 1==1 THEN
+ | SET s = s + 5;
+ | END IF;
+ | END WHILE;
+ |
+ | SELECT i, s;
+ |END
+ |""".stripMargin
+ val expected = Seq(
+ Seq(Row(22)), // handler executed on first iteration
+ Seq(Row(22)), // handler executed on second iteration
+ Seq(Row(2, 10)) // final select: i=2, s=10 (incremented twice)
+ )
+ verifySqlScriptResult(sqlScript, expected = expected)
+ }
+
+ test("continue handler - REPEAT loop with zero division before IF") {
+ val sqlScript =
+ """
+ |BEGIN
+ | DECLARE i INT DEFAULT 0;
+ | DECLARE s INT DEFAULT 0;
+ |
+ | DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+ | BEGIN
+ | SELECT 22;
+ | END;
+ |
+ | REPEAT
+ | SET i = i + 1;
+ |
+ | SELECT 1/0;
+ |
+ | IF 1==1 THEN
+ | SET s = s + 5;
+ | END IF;
+ | UNTIL i >= 2
+ | END REPEAT;
+ |
+ | SELECT i, s;
+ |END
+ |""".stripMargin
+ val expected = Seq(
+ Seq(Row(22)), // handler executed on first iteration
+ Seq(Row(22)), // handler executed on second iteration
+ Seq(Row(2, 10)) // final select: i=2, s=10 (incremented twice)
+ )
+ verifySqlScriptResult(sqlScript, expected = expected)
+ }
+
+ test("continue handler - FOR loop with exception in IF condition") {
+ val sqlScript =
+ """
+ |BEGIN
+ | DECLARE s INT DEFAULT 0;
+ |
+ | DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+ | SELECT 22;
+ |
+ | FOR i AS VALUES (1), (2) DO
+ | SELECT 1;
+ | IF (1/0)==1 THEN
+ | SET s = s + 5;
+ | END IF;
+ | END FOR;
+ |
+ | SELECT s;
+ |END
+ |""".stripMargin
+ val expected = Seq(
+ Seq(Row(1)), // SELECT 1 on first iteration
+ Seq(Row(22)), // handler executed on first iteration
+ Seq(Row(1)), // SELECT 1 on second iteration
+ Seq(Row(22)), // handler executed on second iteration
+ Seq(Row(0)) // final select: i=2, s=0 (never incremented)
+ )
+ verifySqlScriptResult(sqlScript, expected = expected)
+ }
+
+ test("continue handler - FOR loop with zero division before IF") {
+ val sqlScript =
+ """
+ |BEGIN
+ | DECLARE s INT DEFAULT 0;
+ |
+ | DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+ | BEGIN
+ | SELECT 22;
+ | END;
+ |
+ | FOR i AS VALUES (1), (2) DO
+ | SELECT 1;
+ | SELECT 1/0;
+ |
+ | IF 1==1 THEN
+ | SET s = s + 5;
+ | END IF;
+ | END FOR;
+ |
+ | SELECT 10;
+ |END
+ |""".stripMargin
+ val expected = Seq(
+ Seq(Row(1)), // SELECT 1 on first iteration
+ Seq(Row(22)), // handler executed on first iteration
+ Seq(Row(1)), // SELECT 1 on second iteration
+ Seq(Row(22)), // handler executed on second iteration
+ Seq(Row(10)) // final select: s=10 (incremented twice)
+ )
+ verifySqlScriptResult(sqlScript, expected = expected)
+ }
+
+ test("continue handler - zero division before FOR loop") {
+ val sqlScript =
+ """
+ |BEGIN
+ | DECLARE s INT DEFAULT 0;
+ |
+ | DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+ | BEGIN
+ | SELECT 22;
+ | END;
+ |
+ | SELECT 1/0;
+ |
+ | FOR i AS VALUES (1), (2) DO
+ | SELECT 1;
+ | IF 1==1 THEN
+ | SET s = s + 5;
+ | END IF;
+ | END FOR;
+ |
+ | SELECT s;
+ |END
+ |""".stripMargin
+ val expected = Seq(
+ Seq(Row(22)), // handler executed
+ Seq(Row(1)), // SELECT 1 on first iteration
+ Seq(Row(1)), // SELECT 1 on second iteration
+ Seq(Row(10)) // final select: s=10 (incremented twice)
+ )
+ verifySqlScriptResult(sqlScript, expected = expected)
+ }
+
+ test("continue handler - zero division before simple case") {
+ withTable("t") {
+ val commands =
+ """
+ |BEGIN
+ | DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+ | BEGIN
+ | SELECT 22;
+ | END;
+ |
+ | CREATE TABLE t (a INT, b STRING, c DOUBLE) USING parquet;
+ | INSERT INTO t VALUES (1, 'a', 1.0);
+ | INSERT INTO t VALUES (2, 'b', 2.0);
+ |
+ | SELECT 1/0;
+ | CASE (SELECT COUNT(*) FROM t)
+ | WHEN 1 THEN
+ | SELECT 42;
+ | WHEN 3 THEN
+ | SELECT 43;
+ | ELSE
+ | SELECT 44;
+ | END CASE;
+ |END
+ |""".stripMargin
+ val expected = Seq(
+ Seq(Row(22)), // handler executed
+ Seq(Row(44))
+ )
+ verifySqlScriptResult(commands, expected)
+ }
+ }
+
+ test("continue handler - zero division before searched case") {
+ val commands =
+ """
+ |BEGIN
+ | DECLARE CONTINUE HANDLER FOR DIVIDE_BY_ZERO
+ | BEGIN
+ | SELECT 22;
+ | END;
+ |
+ | SELECT 1/0;
+ |
+ | CASE
+ | WHEN 1 = (SELECT 2) THEN
+ | SELECT 1;
+ | WHEN 2 = 2 THEN
+ | SELECT 42;
+ | WHEN (SELECT * FROM t) THEN
+ | SELECT * FROM b;
+ | END CASE;
+ |END
+ |""".stripMargin
+ val expected = Seq(
+ Seq(Row(22)), // handler executed
+ Seq(Row(42))
+ )
+ verifySqlScriptResult(commands, expected)
+ }
+
test("handler - exit resolve in outer block") {
val sqlScript =
"""
@@ -973,6 +1277,40 @@ class SqlScriptingExecutionSuite extends QueryTest with
SharedSparkSession {
verifySqlScriptResult(sqlScript, expected = expected)
}
+ test("continue handler - continue when exception happens in simple case
body") {
+ val sqlScript =
+ """
+ |BEGIN
+ | DECLARE VARIABLE flag INT = -1;
+ | DECLARE VARIABLE x INT = 1;
+ | DECLARE CONTINUE HANDLER FOR SQLSTATE '22012'
+ | BEGIN
+ | SELECT 22;
+ | SET flag = 1;
+ | END;
+ |
+ | CASE x
+ | WHEN flag THEN
+ | SELECT 10;
+ | WHEN 1 THEN
+ | SELECT 11;
+ | SELECT 1/0;
+ | SELECT 33;
+ | ELSE
+ | SELECT 12;
+ | END CASE;
+ | SELECT flag;
+ |END
+ |""".stripMargin
+ val expected = Seq(
+ Seq(Row(11)),
+ Seq(Row(22)),
+ Seq(Row(33)),
+ Seq(Row(1))
+ )
+ verifySqlScriptResult(sqlScript, expected = expected)
+ }
+
test("exit handler - exit resolve when simple case condition computation
fails") {
val sqlScript =
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]