This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ecf003c5671e [SPARK-55451][SQL] Cursors must start collecting results
on OPEN, not first FETCH
ecf003c5671e is described below
commit ecf003c5671e05b37b6fdb04beb7958b803462ce
Author: Serge Rielau <[email protected]>
AuthorDate: Tue Feb 10 14:06:19 2026 -0800
[SPARK-55451][SQL] Cursors must start collecting results on OPEN, not first
FETCH
### What changes were proposed in this pull request?
Cursors should use snapshot semantics pinned to the OPEN statement, not the
first FETCH.
This change fixes that.
Ideally we would want to to simply collect the files, and not start scans
at OPEN, so we do not get runtime errors on OPEN, but that is quite difficult.
Starting the scans at OPEN ("do-at-open") is generally acceptable behavior
though for an ASENSITIVE cursor.
### Why are the changes needed?
Do get clean and standard compliant semantic
### Does this PR introduce _any_ user-facing change?
No (unreleased feature)
### How was this patch tested?
Added more tests, veriofied existing tests change.
### Was this patch authored or co-authored using generative AI tooling?
Claude Opus
Closes #54230 from srielau/SPARK-55451-cursors-do-at-open.
Authored-by: Serge Rielau <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../sql/catalyst/expressions/CursorReference.scala | 11 ++--
.../sql/execution/command/v2/FetchCursorExec.scala | 51 +++++++++----------
.../sql/execution/command/v2/OpenCursorExec.scala | 19 +++++--
.../apache/spark/sql/scripting/CursorState.scala | 31 +++++++-----
.../sql/scripting/SqlScriptingCursorE2eSuite.scala | 59 +++++++++++++++++++---
5 files changed, 119 insertions(+), 52 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CursorReference.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CursorReference.scala
index 365bd384b188..070d0dd0a914 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CursorReference.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CursorReference.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.trees.TreePattern.{CURSOR_REFERENCE,
TreePattern => TP, UNRESOLVED_CURSOR}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, NullType}
/**
* Immutable cursor definition containing the cursor's normalized name and SQL
query text.
@@ -55,6 +55,10 @@ case class UnresolvedCursor(nameParts: Seq[String]) extends
LeafExpression with
* after normalizing the cursor name parts, checking case sensitivity, and
looking up the
* cursor definition from the scripting context.
*
+ * CursorReference is never actually evaluated - it's only used as a marker in
cursor commands
+ * (OPEN, FETCH, CLOSE). Therefore, dataType returns NullType to satisfy
serialization/profiling
+ * requirements without throwing exceptions.
+ *
* @param nameParts The original cursor name parts (unnormalized)
* @param normalizedName The normalized cursor name (for lookups considering
case sensitivity)
* @param scopeLabel Optional label qualifier for scoped cursors (e.g.,
Some("label") for
@@ -66,8 +70,9 @@ case class CursorReference(
normalizedName: String,
scopeLabel: Option[String],
definition: CursorDefinition) extends LeafExpression with Unevaluable {
- override def dataType: DataType = throw new UnresolvedException("dataType")
- override def nullable: Boolean = throw new UnresolvedException("nullable")
+ // CursorReference is never evaluated, but dataType must return a valid type
for serialization
+ override def dataType: DataType = NullType
+ override def nullable: Boolean = true
override def eval(input: InternalRow): Any =
throw new UnsupportedOperationException("CursorReference cannot be
evaluated")
override def sql: String = nameParts.mkString(".")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
index 5b34233014a9..b2c42d64c0a1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{InternalRow, SqlScriptingContextManager}
import org.apache.spark.sql.catalyst.analysis.{FakeLocalCatalog,
FakeSystemCatalog}
import org.apache.spark.sql.catalyst.catalog.VariableDefinition
-import org.apache.spark.sql.catalyst.expressions.{Attribute, CursorReference,
Expression, Literal, VariableReference}
-import org.apache.spark.sql.classic.Dataset
+import org.apache.spark.sql.catalyst.expressions.{Attribute, CursorReference,
Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.VariableReference
import org.apache.spark.sql.errors.DataTypeErrorsBase
import
org.apache.spark.sql.errors.QueryCompilationErrors.unresolvedVariableError
import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
@@ -34,9 +34,10 @@ import org.apache.spark.sql.scripting.{CursorFetching,
CursorOpened}
/**
* Physical plan node for fetching from cursors.
*
- * Transitions cursor from Opened to Fetching state on first fetch (creating
result iterator),
- * then fetches rows from the iterator on subsequent calls. Assigns fetched
values to target
- * variables with ANSI store assignment rules.
+ * Fetches the next row from the result iterator that was created at OPEN time.
+ * On first fetch, transitions from Opened to Fetching state.
+ *
+ * Assigns fetched values to target variables with ANSI store assignment rules.
*
* @param cursor CursorReference resolved during analysis phase
* @param targetVariables Variables to fetch into
@@ -57,24 +58,20 @@ case class FetchCursorExec(
errorClass = "CURSOR_NOT_FOUND",
messageParameters = Map("cursorName" ->
toSQLId(cursorRef.definition.name))))
- // Get or create iterator based on current state
- val (iterator, analyzedQuery) = currentState match {
- case CursorOpened(query) =>
- // First fetch - create iterator and transition to Fetching state
- // Use executeToIterator() to get InternalRow directly, avoiding
conversion overhead
- val df = Dataset.ofRows(
- session.asInstanceOf[org.apache.spark.sql.classic.SparkSession],
- query)
- val iter = df.queryExecution.executedPlan.executeToIterator()
+ // Get iterator based on current state
+ val (iterator, outputSchema) = currentState match {
+ case CursorOpened(iter, schema) =>
+ // First fetch - transition to Fetching state
+ // Iterator was already created at OPEN time
scriptingContext.updateCursorState(
cursorRef.normalizedName,
cursorRef.scopeLabel,
- CursorFetching(query, iter))
- (iter, query)
+ CursorFetching(iter, schema))
+ (iter, schema)
- case CursorFetching(query, iter) =>
+ case CursorFetching(iter, schema) =>
// Subsequent fetch - use existing iterator
- (iter, query)
+ (iter, schema)
case _ =>
throw new AnalysisException(
@@ -98,10 +95,10 @@ case class FetchCursorExec(
targetVariables.head,
targetVariables.head.dataType.asInstanceOf[org.apache.spark.sql.types.StructType],
currentRow,
- analyzedQuery)
+ outputSchema)
} else {
// Regular case: one-to-one column-to-variable assignment
- fetchIntoVariables(targetVariables, currentRow, analyzedQuery)
+ fetchIntoVariables(targetVariables, currentRow, outputSchema)
}
Nil
@@ -127,7 +124,7 @@ case class FetchCursorExec(
private def fetchIntoVariables(
targetVariables: Seq[VariableReference],
currentRow: InternalRow,
- analyzedQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan):
Unit = {
+ outputSchema: Seq[Attribute]): Unit = {
// Validate arity
if (targetVariables.length != currentRow.numFields) {
throw new AnalysisException(
@@ -139,10 +136,10 @@ case class FetchCursorExec(
// Assign each column to its corresponding variable
targetVariables.zipWithIndex.foreach { case (varRef, idx) =>
- val sourceValue = currentRow.get(idx, analyzedQuery.output(idx).dataType)
+ val sourceValue = currentRow.get(idx, outputSchema(idx).dataType)
val castedValue = applyCastIfNeeded(
sourceValue,
- analyzedQuery.output(idx).dataType,
+ outputSchema(idx).dataType,
varRef.dataType)
assignToVariable(varRef, castedValue)
@@ -225,7 +222,7 @@ case class FetchCursorExec(
targetVar: VariableReference,
structType: org.apache.spark.sql.types.StructType,
currentRow: InternalRow,
- analyzedQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan):
Unit = {
+ outputSchema: Seq[Attribute]): Unit = {
import org.apache.spark.sql.catalyst.expressions.{Cast, CreateStruct,
Literal}
import org.apache.spark.sql.catalyst.InternalRow
@@ -240,11 +237,11 @@ case class FetchCursorExec(
// Build struct fields by extracting and casting cursor columns
val fieldExpressions = structType.fields.zipWithIndex.map { case (field,
idx) =>
- val sourceValue = currentRow.get(idx, analyzedQuery.output(idx).dataType)
- val sourceLiteral = Literal(sourceValue,
analyzedQuery.output(idx).dataType)
+ val sourceValue = currentRow.get(idx, outputSchema(idx).dataType)
+ val sourceLiteral = Literal(sourceValue, outputSchema(idx).dataType)
// Apply ANSI cast if types differ
- if (analyzedQuery.output(idx).dataType == field.dataType) {
+ if (outputSchema(idx).dataType == field.dataType) {
sourceLiteral
} else {
Cast(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/OpenCursorExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/OpenCursorExec.scala
index d90b6af4c00f..26b89f0516c2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/OpenCursorExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/OpenCursorExec.scala
@@ -31,8 +31,14 @@ import org.apache.spark.sql.scripting.{CursorClosed,
CursorDeclared, CursorOpene
* 1. Parsing the cursor's SQL query text to a LogicalPlan
* 2. Binding parameters (if USING clause is provided)
* 3. Analyzing the query (semantic analysis, resolution, type checking)
+ * 4. Generating physical plan and creating the result iterator
*
- * Does not execute the query or create result iterator - that happens on
first FETCH.
+ * CRITICAL: executeToIterator() MUST be called at OPEN time to capture
snapshots.
+ * This is the only point where Spark locks in file lists and Delta snapshots.
+ * Simply creating the physical plan is not sufficient - execution must start.
+ *
+ * The iterator is lazy - it doesn't materialize all data, but it does lock in
+ * which files/versions will be scanned.
*
* Uses ParameterizedQueryExecutor trait for unified parameter binding with
EXECUTE IMMEDIATE.
*
@@ -73,11 +79,18 @@ case class OpenCursorExec(
// Uses shared ParameterizedQueryExecutor trait for consistency with
EXECUTE IMMEDIATE
val analyzedQuery = executeParameterizedQuery(cursorDef.queryText, args,
paramNames)
- // Transition cursor state to Opened
+ // Generate physical plan and create iterator at OPEN time
+ // CRITICAL: executeToIterator() must be called NOW to capture snapshot
+ val df = org.apache.spark.sql.classic.Dataset.ofRows(
+ session.asInstanceOf[org.apache.spark.sql.classic.SparkSession],
+ analyzedQuery)
+ val resultIterator = df.queryExecution.executedPlan.executeToIterator()
+
+ // Transition cursor state to Opened with iterator
scriptingContext.updateCursorState(
cursorRef.normalizedName,
cursorRef.scopeLabel,
- CursorOpened(analyzedQuery))
+ CursorOpened(resultIterator, analyzedQuery.output))
Nil
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/CursorState.scala
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/CursorState.scala
index 13f12e26f632..ba144f708761 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/scripting/CursorState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/scripting/CursorState.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.scripting
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
/**
* Sealed trait representing the lifecycle state of a cursor.
* State transitions:
@@ -32,25 +35,29 @@ sealed trait CursorState
case object CursorDeclared extends CursorState
/**
- * Cursor has been opened and query has been parsed/analyzed.
- * For parameterized cursors, the query here has parameters substituted.
+ * Cursor has been opened and result iterator has been created.
+ *
+ * CRITICAL: The iterator is created at OPEN time (not first FETCH) to ensure
snapshot
+ * semantics. When executeToIterator() is called, Spark performs file
discovery and
+ * captures Delta snapshots. This is the ONLY way to lock in the data snapshot
at OPEN time.
*
- * @param analyzedQuery The analyzed logical plan with parameters bound
+ * The iterator is lazy/incremental - it doesn't materialize all results, but
it does
+ * lock in which files/versions will be read.
+ *
+ * @param resultIterator Iterator created at OPEN time (snapshot captured)
+ * @param outputSchema The output attributes (needed for type checking in
FETCH)
*/
case class CursorOpened(
- analyzedQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
extends CursorState
+ resultIterator: Iterator[InternalRow],
+ outputSchema: Seq[Attribute]) extends CursorState
/**
- * Cursor is being fetched from - result iterator is active.
- * Uses executeToIterator() to avoid loading all data into memory at once.
- * Uses InternalRow format to avoid unnecessary conversions.
- *
- * @param analyzedQuery The analyzed logical plan
- * @param resultIterator Iterator over InternalRow results
+ * Cursor is being fetched from - same as CursorOpened but marks that fetching
has begun.
+ * We keep this separate state for consistency and potential future use.
*/
case class CursorFetching(
- analyzedQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan,
- resultIterator: Iterator[org.apache.spark.sql.catalyst.InternalRow])
extends CursorState
+ resultIterator: Iterator[InternalRow],
+ outputSchema: Seq[Attribute]) extends CursorState
/**
* Cursor has been closed and resources released.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingCursorE2eSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingCursorE2eSuite.scala
index dfd1cee49ae7..97eaf66c9d78 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingCursorE2eSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingCursorE2eSuite.scala
@@ -359,7 +359,13 @@ END;""")
)
}
- test("Test 20: Cursor sensitivity - cursor captures snapshot when opened") {
+ test("Test 20: Cursor sensitivity - INSENSITIVE cursors capture snapshot at
OPEN") {
+ // Spark cursors are INSENSITIVE by default, which means they capture a
snapshot when
+ // OPEN is called. The physical plan is generated at OPEN time, locking in
file lists,
+ // Delta snapshots, and other data source metadata.
+ //
+ // This test verifies snapshot semantics: rows inserted AFTER OPEN but
BEFORE first
+ // FETCH should NOT be visible to the cursor.
sql("CREATE TABLE cursor_sensitivity_test (id INT, value STRING) USING
parquet;")
sql("INSERT INTO cursor_sensitivity_test VALUES (1, 'row1'), (2, 'row2');")
val result = sql("""BEGIN
@@ -382,10 +388,10 @@ END;""")
-- Step 4: OPEN the cursor (captures snapshot - should see rows 1-4)
OPEN cur;
- -- Step 5: Add more rows after OPEN
+ -- Step 5: Add more rows after OPEN but before first FETCH
INSERT INTO cursor_sensitivity_test VALUES (5, 'row5'), (6, 'row6');
- -- Step 6: Fetch rows - should see snapshot from OPEN time (4 rows)
+ -- Step 6: Fetch rows - should see snapshot from OPEN time (4 rows only)
REPEAT
FETCH cur INTO fetched_id, fetched_value;
IF NOT nomorerows THEN
@@ -396,11 +402,11 @@ END;""")
-- Step 7: Close the cursor
CLOSE cur;
- -- Step 8: Open the cursor again (should capture new snapshot)
+ -- Step 8: Open the cursor again (captures new snapshot with all 6 rows)
SET nomorerows = false;
OPEN cur;
- -- Step 9: Fetch rows - demonstrates cursor behavior
+ -- Step 9: Fetch rows - should see all 6 rows now
REPEAT
FETCH cur INTO fetched_id, fetched_value;
IF NOT nomorerows THEN
@@ -408,13 +414,13 @@ END;""")
END IF;
UNTIL nomorerows END REPEAT;
- -- Return both counts
+ -- Return both counts (first=4, second=6)
VALUES (row_count_first_open, row_count_second_open);
CLOSE cur;
END;""")
checkAnswer(result, Seq(
- Row(6, 6)))
+ Row(4, 6)))
}
test("Test 21: Basic parameterized cursor with positional parameters") {
@@ -1900,5 +1906,44 @@ END;""")
checkAnswer(result, Seq(Row("Done")))
}
+ test("Test 92: Runtime error in cursor query caught at OPEN (snapshot
semantics)") {
+ // With snapshot semantics, executeToIterator() is called at OPEN time to
capture
+ // the data snapshot. This means query execution begins at OPEN, not at
first FETCH.
+ //
+ // IMPORTANT: Runtime errors in the cursor query are now thrown at OPEN
time because
+ // that's when execution starts. This is a consequence of implementing
INSENSITIVE
+ // cursor semantics where the snapshot must be captured at OPEN.
+ //
+ // This test verifies that runtime errors (e.g., divide by zero) are
caught at OPEN.
+ sql("CREATE TABLE cursor_row_error_test (id INT, value INT) USING parquet")
+ sql("INSERT INTO cursor_row_error_test VALUES (1, 10), (2, 0), (3, 5)")
+ try {
+ val result = sql("""BEGIN
+ DECLARE result STRING DEFAULT 'start';
+ DECLARE x INT;
+ DECLARE y INT;
+
+ BEGIN
+ DECLARE cur CURSOR FOR SELECT id, 100 / value AS result FROM
cursor_row_error_test ORDER BY id;
+
+ DECLARE EXIT HANDLER FOR SQLEXCEPTION
+ SET result = result || ' | failed at open';
+
+ SET result = 'declared';
+ OPEN cur; -- Fails here: executeToIterator() starts execution, hits
divide-by-zero
+ SET result = 'opened'; -- Should NOT reach here
+ FETCH cur INTO x, y;
+ SET result = 'fetched: ' || x || ',' || y; -- Should NOT reach here
+ END;
+
+ SELECT result;
+END;""")
+ // Should show DECLARE succeeded, but OPEN failed due to divide-by-zero
+ checkAnswer(result, Seq(Row("declared | failed at open")))
+ } finally {
+ sql("DROP TABLE IF EXISTS cursor_row_error_test")
+ }
+ }
+
}
// scalastyle:on line.size.limit
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]