This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ecf003c5671e [SPARK-55451][SQL] Cursors must start collecting results 
on OPEN, not first FETCH
ecf003c5671e is described below

commit ecf003c5671e05b37b6fdb04beb7958b803462ce
Author: Serge Rielau <[email protected]>
AuthorDate: Tue Feb 10 14:06:19 2026 -0800

    [SPARK-55451][SQL] Cursors must start collecting results on OPEN, not first 
FETCH
    
    ### What changes were proposed in this pull request?
    
    Cursors should use snapshot semantics pinned to the OPEN statement, not the 
first FETCH.
    This change fixes that.
    Ideally we would want to to simply collect the files, and not start scans 
at OPEN, so we do not get runtime errors on OPEN, but that is quite difficult. 
Starting the scans at OPEN ("do-at-open") is generally acceptable behavior 
though for an ASENSITIVE cursor.
    
    ### Why are the changes needed?
    
    Do get clean and standard compliant semantic
    
    ### Does this PR introduce _any_ user-facing change?
    
    No (unreleased feature)
    
    ### How was this patch tested?
    
    Added more tests, veriofied existing tests change.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Claude Opus
    
    Closes #54230 from srielau/SPARK-55451-cursors-do-at-open.
    
    Authored-by: Serge Rielau <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../sql/catalyst/expressions/CursorReference.scala | 11 ++--
 .../sql/execution/command/v2/FetchCursorExec.scala | 51 +++++++++----------
 .../sql/execution/command/v2/OpenCursorExec.scala  | 19 +++++--
 .../apache/spark/sql/scripting/CursorState.scala   | 31 +++++++-----
 .../sql/scripting/SqlScriptingCursorE2eSuite.scala | 59 +++++++++++++++++++---
 5 files changed, 119 insertions(+), 52 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CursorReference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CursorReference.scala
index 365bd384b188..070d0dd0a914 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CursorReference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CursorReference.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedException
 import org.apache.spark.sql.catalyst.trees.TreePattern.{CURSOR_REFERENCE, 
TreePattern => TP, UNRESOLVED_CURSOR}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, NullType}
 
 /**
  * Immutable cursor definition containing the cursor's normalized name and SQL 
query text.
@@ -55,6 +55,10 @@ case class UnresolvedCursor(nameParts: Seq[String]) extends 
LeafExpression with
  * after normalizing the cursor name parts, checking case sensitivity, and 
looking up the
  * cursor definition from the scripting context.
  *
+ * CursorReference is never actually evaluated - it's only used as a marker in 
cursor commands
+ * (OPEN, FETCH, CLOSE). Therefore, dataType returns NullType to satisfy 
serialization/profiling
+ * requirements without throwing exceptions.
+ *
  * @param nameParts The original cursor name parts (unnormalized)
  * @param normalizedName The normalized cursor name (for lookups considering 
case sensitivity)
  * @param scopeLabel Optional label qualifier for scoped cursors (e.g., 
Some("label") for
@@ -66,8 +70,9 @@ case class CursorReference(
     normalizedName: String,
     scopeLabel: Option[String],
     definition: CursorDefinition) extends LeafExpression with Unevaluable {
-  override def dataType: DataType = throw new UnresolvedException("dataType")
-  override def nullable: Boolean = throw new UnresolvedException("nullable")
+  // CursorReference is never evaluated, but dataType must return a valid type 
for serialization
+  override def dataType: DataType = NullType
+  override def nullable: Boolean = true
   override def eval(input: InternalRow): Any =
     throw new UnsupportedOperationException("CursorReference cannot be 
evaluated")
   override def sql: String = nameParts.mkString(".")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
index 5b34233014a9..b2c42d64c0a1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{InternalRow, SqlScriptingContextManager}
 import org.apache.spark.sql.catalyst.analysis.{FakeLocalCatalog, 
FakeSystemCatalog}
 import org.apache.spark.sql.catalyst.catalog.VariableDefinition
-import org.apache.spark.sql.catalyst.expressions.{Attribute, CursorReference, 
Expression, Literal, VariableReference}
-import org.apache.spark.sql.classic.Dataset
+import org.apache.spark.sql.catalyst.expressions.{Attribute, CursorReference, 
Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.VariableReference
 import org.apache.spark.sql.errors.DataTypeErrorsBase
 import 
org.apache.spark.sql.errors.QueryCompilationErrors.unresolvedVariableError
 import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
@@ -34,9 +34,10 @@ import org.apache.spark.sql.scripting.{CursorFetching, 
CursorOpened}
 /**
  * Physical plan node for fetching from cursors.
  *
- * Transitions cursor from Opened to Fetching state on first fetch (creating 
result iterator),
- * then fetches rows from the iterator on subsequent calls. Assigns fetched 
values to target
- * variables with ANSI store assignment rules.
+ * Fetches the next row from the result iterator that was created at OPEN time.
+ * On first fetch, transitions from Opened to Fetching state.
+ *
+ * Assigns fetched values to target variables with ANSI store assignment rules.
  *
  * @param cursor CursorReference resolved during analysis phase
  * @param targetVariables Variables to fetch into
@@ -57,24 +58,20 @@ case class FetchCursorExec(
         errorClass = "CURSOR_NOT_FOUND",
         messageParameters = Map("cursorName" -> 
toSQLId(cursorRef.definition.name))))
 
-    // Get or create iterator based on current state
-    val (iterator, analyzedQuery) = currentState match {
-      case CursorOpened(query) =>
-        // First fetch - create iterator and transition to Fetching state
-        // Use executeToIterator() to get InternalRow directly, avoiding 
conversion overhead
-        val df = Dataset.ofRows(
-          session.asInstanceOf[org.apache.spark.sql.classic.SparkSession],
-          query)
-        val iter = df.queryExecution.executedPlan.executeToIterator()
+    // Get iterator based on current state
+    val (iterator, outputSchema) = currentState match {
+      case CursorOpened(iter, schema) =>
+        // First fetch - transition to Fetching state
+        // Iterator was already created at OPEN time
         scriptingContext.updateCursorState(
           cursorRef.normalizedName,
           cursorRef.scopeLabel,
-          CursorFetching(query, iter))
-        (iter, query)
+          CursorFetching(iter, schema))
+        (iter, schema)
 
-      case CursorFetching(query, iter) =>
+      case CursorFetching(iter, schema) =>
         // Subsequent fetch - use existing iterator
-        (iter, query)
+        (iter, schema)
 
       case _ =>
         throw new AnalysisException(
@@ -98,10 +95,10 @@ case class FetchCursorExec(
         targetVariables.head,
         
targetVariables.head.dataType.asInstanceOf[org.apache.spark.sql.types.StructType],
         currentRow,
-        analyzedQuery)
+        outputSchema)
     } else {
       // Regular case: one-to-one column-to-variable assignment
-      fetchIntoVariables(targetVariables, currentRow, analyzedQuery)
+      fetchIntoVariables(targetVariables, currentRow, outputSchema)
     }
 
     Nil
@@ -127,7 +124,7 @@ case class FetchCursorExec(
   private def fetchIntoVariables(
       targetVariables: Seq[VariableReference],
       currentRow: InternalRow,
-      analyzedQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan): 
Unit = {
+      outputSchema: Seq[Attribute]): Unit = {
     // Validate arity
     if (targetVariables.length != currentRow.numFields) {
       throw new AnalysisException(
@@ -139,10 +136,10 @@ case class FetchCursorExec(
 
     // Assign each column to its corresponding variable
     targetVariables.zipWithIndex.foreach { case (varRef, idx) =>
-      val sourceValue = currentRow.get(idx, analyzedQuery.output(idx).dataType)
+      val sourceValue = currentRow.get(idx, outputSchema(idx).dataType)
       val castedValue = applyCastIfNeeded(
         sourceValue,
-        analyzedQuery.output(idx).dataType,
+        outputSchema(idx).dataType,
         varRef.dataType)
 
       assignToVariable(varRef, castedValue)
@@ -225,7 +222,7 @@ case class FetchCursorExec(
       targetVar: VariableReference,
       structType: org.apache.spark.sql.types.StructType,
       currentRow: InternalRow,
-      analyzedQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan): 
Unit = {
+      outputSchema: Seq[Attribute]): Unit = {
     import org.apache.spark.sql.catalyst.expressions.{Cast, CreateStruct, 
Literal}
     import org.apache.spark.sql.catalyst.InternalRow
 
@@ -240,11 +237,11 @@ case class FetchCursorExec(
 
     // Build struct fields by extracting and casting cursor columns
     val fieldExpressions = structType.fields.zipWithIndex.map { case (field, 
idx) =>
-      val sourceValue = currentRow.get(idx, analyzedQuery.output(idx).dataType)
-      val sourceLiteral = Literal(sourceValue, 
analyzedQuery.output(idx).dataType)
+      val sourceValue = currentRow.get(idx, outputSchema(idx).dataType)
+      val sourceLiteral = Literal(sourceValue, outputSchema(idx).dataType)
 
       // Apply ANSI cast if types differ
-      if (analyzedQuery.output(idx).dataType == field.dataType) {
+      if (outputSchema(idx).dataType == field.dataType) {
         sourceLiteral
       } else {
         Cast(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/OpenCursorExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/OpenCursorExec.scala
index d90b6af4c00f..26b89f0516c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/OpenCursorExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/OpenCursorExec.scala
@@ -31,8 +31,14 @@ import org.apache.spark.sql.scripting.{CursorClosed, 
CursorDeclared, CursorOpene
  * 1. Parsing the cursor's SQL query text to a LogicalPlan
  * 2. Binding parameters (if USING clause is provided)
  * 3. Analyzing the query (semantic analysis, resolution, type checking)
+ * 4. Generating physical plan and creating the result iterator
  *
- * Does not execute the query or create result iterator - that happens on 
first FETCH.
+ * CRITICAL: executeToIterator() MUST be called at OPEN time to capture 
snapshots.
+ * This is the only point where Spark locks in file lists and Delta snapshots.
+ * Simply creating the physical plan is not sufficient - execution must start.
+ *
+ * The iterator is lazy - it doesn't materialize all data, but it does lock in
+ * which files/versions will be scanned.
  *
  * Uses ParameterizedQueryExecutor trait for unified parameter binding with 
EXECUTE IMMEDIATE.
  *
@@ -73,11 +79,18 @@ case class OpenCursorExec(
     // Uses shared ParameterizedQueryExecutor trait for consistency with 
EXECUTE IMMEDIATE
     val analyzedQuery = executeParameterizedQuery(cursorDef.queryText, args, 
paramNames)
 
-    // Transition cursor state to Opened
+    // Generate physical plan and create iterator at OPEN time
+    // CRITICAL: executeToIterator() must be called NOW to capture snapshot
+    val df = org.apache.spark.sql.classic.Dataset.ofRows(
+      session.asInstanceOf[org.apache.spark.sql.classic.SparkSession],
+      analyzedQuery)
+    val resultIterator = df.queryExecution.executedPlan.executeToIterator()
+
+    // Transition cursor state to Opened with iterator
     scriptingContext.updateCursorState(
       cursorRef.normalizedName,
       cursorRef.scopeLabel,
-      CursorOpened(analyzedQuery))
+      CursorOpened(resultIterator, analyzedQuery.output))
 
     Nil
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/CursorState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/CursorState.scala
index 13f12e26f632..ba144f708761 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/scripting/CursorState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/scripting/CursorState.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.scripting
 
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
 /**
  * Sealed trait representing the lifecycle state of a cursor.
  * State transitions:
@@ -32,25 +35,29 @@ sealed trait CursorState
 case object CursorDeclared extends CursorState
 
 /**
- * Cursor has been opened and query has been parsed/analyzed.
- * For parameterized cursors, the query here has parameters substituted.
+ * Cursor has been opened and result iterator has been created.
+ *
+ * CRITICAL: The iterator is created at OPEN time (not first FETCH) to ensure 
snapshot
+ * semantics. When executeToIterator() is called, Spark performs file 
discovery and
+ * captures Delta snapshots. This is the ONLY way to lock in the data snapshot 
at OPEN time.
  *
- * @param analyzedQuery The analyzed logical plan with parameters bound
+ * The iterator is lazy/incremental - it doesn't materialize all results, but 
it does
+ * lock in which files/versions will be read.
+ *
+ * @param resultIterator Iterator created at OPEN time (snapshot captured)
+ * @param outputSchema The output attributes (needed for type checking in 
FETCH)
  */
 case class CursorOpened(
-    analyzedQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan) 
extends CursorState
+    resultIterator: Iterator[InternalRow],
+    outputSchema: Seq[Attribute]) extends CursorState
 
 /**
- * Cursor is being fetched from - result iterator is active.
- * Uses executeToIterator() to avoid loading all data into memory at once.
- * Uses InternalRow format to avoid unnecessary conversions.
- *
- * @param analyzedQuery The analyzed logical plan
- * @param resultIterator Iterator over InternalRow results
+ * Cursor is being fetched from - same as CursorOpened but marks that fetching 
has begun.
+ * We keep this separate state for consistency and potential future use.
  */
 case class CursorFetching(
-    analyzedQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan,
-    resultIterator: Iterator[org.apache.spark.sql.catalyst.InternalRow]) 
extends CursorState
+    resultIterator: Iterator[InternalRow],
+    outputSchema: Seq[Attribute]) extends CursorState
 
 /**
  * Cursor has been closed and resources released.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingCursorE2eSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingCursorE2eSuite.scala
index dfd1cee49ae7..97eaf66c9d78 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingCursorE2eSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingCursorE2eSuite.scala
@@ -359,7 +359,13 @@ END;""")
     )
   }
 
-  test("Test 20: Cursor sensitivity - cursor captures snapshot when opened") {
+  test("Test 20: Cursor sensitivity - INSENSITIVE cursors capture snapshot at 
OPEN") {
+    // Spark cursors are INSENSITIVE by default, which means they capture a 
snapshot when
+    // OPEN is called. The physical plan is generated at OPEN time, locking in 
file lists,
+    // Delta snapshots, and other data source metadata.
+    //
+    // This test verifies snapshot semantics: rows inserted AFTER OPEN but 
BEFORE first
+    // FETCH should NOT be visible to the cursor.
     sql("CREATE TABLE cursor_sensitivity_test (id INT, value STRING) USING 
parquet;")
     sql("INSERT INTO cursor_sensitivity_test VALUES (1, 'row1'), (2, 'row2');")
     val result = sql("""BEGIN
@@ -382,10 +388,10 @@ END;""")
   -- Step 4: OPEN the cursor (captures snapshot - should see rows 1-4)
   OPEN cur;
 
-  -- Step 5: Add more rows after OPEN
+  -- Step 5: Add more rows after OPEN but before first FETCH
   INSERT INTO cursor_sensitivity_test VALUES (5, 'row5'), (6, 'row6');
 
-  -- Step 6: Fetch rows - should see snapshot from OPEN time (4 rows)
+  -- Step 6: Fetch rows - should see snapshot from OPEN time (4 rows only)
   REPEAT
     FETCH cur INTO fetched_id, fetched_value;
     IF NOT nomorerows THEN
@@ -396,11 +402,11 @@ END;""")
   -- Step 7: Close the cursor
   CLOSE cur;
 
-  -- Step 8: Open the cursor again (should capture new snapshot)
+  -- Step 8: Open the cursor again (captures new snapshot with all 6 rows)
   SET nomorerows = false;
   OPEN cur;
 
-  -- Step 9: Fetch rows - demonstrates cursor behavior
+  -- Step 9: Fetch rows - should see all 6 rows now
   REPEAT
     FETCH cur INTO fetched_id, fetched_value;
     IF NOT nomorerows THEN
@@ -408,13 +414,13 @@ END;""")
     END IF;
   UNTIL nomorerows END REPEAT;
 
-  -- Return both counts
+  -- Return both counts (first=4, second=6)
   VALUES (row_count_first_open, row_count_second_open);
 
   CLOSE cur;
 END;""")
     checkAnswer(result, Seq(
-      Row(6, 6)))
+      Row(4, 6)))
   }
 
   test("Test 21: Basic parameterized cursor with positional parameters") {
@@ -1900,5 +1906,44 @@ END;""")
     checkAnswer(result, Seq(Row("Done")))
   }
 
+  test("Test 92: Runtime error in cursor query caught at OPEN (snapshot 
semantics)") {
+    // With snapshot semantics, executeToIterator() is called at OPEN time to 
capture
+    // the data snapshot. This means query execution begins at OPEN, not at 
first FETCH.
+    //
+    // IMPORTANT: Runtime errors in the cursor query are now thrown at OPEN 
time because
+    // that's when execution starts. This is a consequence of implementing 
INSENSITIVE
+    // cursor semantics where the snapshot must be captured at OPEN.
+    //
+    // This test verifies that runtime errors (e.g., divide by zero) are 
caught at OPEN.
+    sql("CREATE TABLE cursor_row_error_test (id INT, value INT) USING parquet")
+    sql("INSERT INTO cursor_row_error_test VALUES (1, 10), (2, 0), (3, 5)")
+    try {
+      val result = sql("""BEGIN
+  DECLARE result STRING DEFAULT 'start';
+  DECLARE x INT;
+  DECLARE y INT;
+
+  BEGIN
+    DECLARE cur CURSOR FOR SELECT id, 100 / value AS result FROM 
cursor_row_error_test ORDER BY id;
+
+    DECLARE EXIT HANDLER FOR SQLEXCEPTION
+      SET result = result || ' | failed at open';
+
+    SET result = 'declared';
+    OPEN cur;  -- Fails here: executeToIterator() starts execution, hits 
divide-by-zero
+    SET result = 'opened';  -- Should NOT reach here
+    FETCH cur INTO x, y;
+    SET result = 'fetched: ' || x || ',' || y;  -- Should NOT reach here
+  END;
+
+  SELECT result;
+END;""")
+      // Should show DECLARE succeeded, but OPEN failed due to divide-by-zero
+      checkAnswer(result, Seq(Row("declared | failed at open")))
+    } finally {
+      sql("DROP TABLE IF EXISTS cursor_row_error_test")
+    }
+  }
+
 }
 // scalastyle:on line.size.limit


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to