This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b5d195a5cd1b [SPARK-48344][SQL] Add Frames and Scopes to support 
Exception Handlers and Local Variables
b5d195a5cd1b is described below

commit b5d195a5cd1b027d6cf1d7b992a6ef6c2f18520f
Author: Milan Dankovic <[email protected]>
AuthorDate: Thu Dec 12 16:41:30 2024 +0300

    [SPARK-48344][SQL] Add Frames and Scopes to support Exception Handlers and 
Local Variables
    
    ### What changes were proposed in this pull request?
    This PR is **third** in series of refactoring and introducing SQL Scripting 
Execution Framework:
    - Introducing `SqlScriptingExecutionContext`, object to keep current state 
of script execution.
    - Introducing `Frames` and `Scopes` to support Local Variables and Error 
Handlers resolution.
    - Decoupled `SqlScriptingIterator` from `SqlScriptExecution`.
    - Enabling execution of SQL Scripting using `sql()` API.
    - Updated `SqlScriptingExecutionNodeSuite` so tests remain independent of 
concept of Frames and Scopes.
    
    First [PR](https://github.com/apache/spark/pull/48879)
    Second [PR](https://github.com/apache/spark/pull/48950)
    
    ### Why are the changes needed?
    This changes are needed to enable introduction of Error Handling mechanism 
and Local Variables.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests that were updated to support new concepts introduced in this 
PR.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #49006 from miland-db/milan-dankovic_data/refactor-execution-3.
    
    Lead-authored-by: Milan Dankovic <[email protected]>
    Co-authored-by: David Milicevic <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
---
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  80 ++++-
 .../plans/logical/SqlScriptingLogicalPlans.scala   |   5 +-
 .../sql/scripting/SqlScriptingExecution.scala      |  44 ++-
 .../scripting/SqlScriptingExecutionContext.scala   |  91 ++++++
 .../sql/scripting/SqlScriptingExecutionNode.scala  |  86 ++++-
 .../sql/scripting/SqlScriptingInterpreter.scala    |  62 ++--
 .../scripting/SqlScriptingExecutionNodeSuite.scala | 355 +++++++++++----------
 .../scripting/SqlScriptingInterpreterSuite.scala   |  10 +-
 8 files changed, 508 insertions(+), 225 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 47139810528d..64491264f3e9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -144,16 +144,27 @@ class AstBuilder extends DataTypeAstBuilder
 
   override def visitSingleCompoundStatement(ctx: 
SingleCompoundStatementContext): CompoundBody = {
     val labelCtx = new SqlScriptingLabelContext()
-    Option(ctx.compoundBody())
-      .map(visitCompoundBodyImpl(_, None, allowVarDeclare = true, labelCtx))
-      .getOrElse(CompoundBody(Seq.empty, None))
+    val labelText = labelCtx.enterLabeledScope(None, None)
+
+    val script = Option(ctx.compoundBody())
+      .map(visitCompoundBodyImpl(
+        _,
+        Some(labelText),
+        allowVarDeclare = true,
+        labelCtx,
+        isScope = true
+      )).getOrElse(CompoundBody(Seq.empty, Some(labelText), isScope = true))
+
+    labelCtx.exitLabeledScope(None)
+    script
   }
 
   private def visitCompoundBodyImpl(
       ctx: CompoundBodyContext,
       label: Option[String],
       allowVarDeclare: Boolean,
-      labelCtx: SqlScriptingLabelContext): CompoundBody = {
+      labelCtx: SqlScriptingLabelContext,
+      isScope: Boolean): CompoundBody = {
     val buff = ListBuffer[CompoundPlanStatement]()
     ctx.compoundStatements.forEach(
       compoundStatement => buff += 
visitCompoundStatementImpl(compoundStatement, labelCtx))
@@ -185,7 +196,7 @@ class AstBuilder extends DataTypeAstBuilder
       case _ =>
     }
 
-    CompoundBody(buff.toSeq, label)
+    CompoundBody(buff.toSeq, label, isScope)
   }
 
   private def visitBeginEndCompoundBlockImpl(
@@ -194,8 +205,13 @@ class AstBuilder extends DataTypeAstBuilder
     val labelText =
       labelCtx.enterLabeledScope(Option(ctx.beginLabel()), 
Option(ctx.endLabel()))
     val body = Option(ctx.compoundBody())
-      .map(visitCompoundBodyImpl(_, Some(labelText), allowVarDeclare = true, 
labelCtx))
-      .getOrElse(CompoundBody(Seq.empty, Some(labelText)))
+      .map(visitCompoundBodyImpl(
+        _,
+        Some(labelText),
+        allowVarDeclare = true,
+        labelCtx,
+        isScope = true
+      )).getOrElse(CompoundBody(Seq.empty, Some(labelText), isScope = true))
     labelCtx.exitLabeledScope(Option(ctx.beginLabel()))
     body
   }
@@ -246,10 +262,12 @@ class AstBuilder extends DataTypeAstBuilder
             OneRowRelation()))
       }),
       conditionalBodies = ctx.conditionalBodies.asScala.toList.map(
-        body => visitCompoundBodyImpl(body, None, allowVarDeclare = false, 
labelCtx)
+        body =>
+          visitCompoundBodyImpl(body, None, allowVarDeclare = false, labelCtx, 
isScope = false)
       ),
       elseBody = Option(ctx.elseBody).map(
-        body => visitCompoundBodyImpl(body, None, allowVarDeclare = false, 
labelCtx)
+        body =>
+          visitCompoundBodyImpl(body, None, allowVarDeclare = false, labelCtx, 
isScope = false)
       )
     )
   }
@@ -266,7 +284,13 @@ class AstBuilder extends DataTypeAstBuilder
         Project(
           Seq(Alias(expression(boolExpr), "condition")()),
           OneRowRelation()))}
-    val body = visitCompoundBodyImpl(ctx.compoundBody(), None, allowVarDeclare 
= false, labelCtx)
+    val body = visitCompoundBodyImpl(
+      ctx.compoundBody(),
+      None,
+      allowVarDeclare = false,
+      labelCtx,
+      isScope = false
+    )
     labelCtx.exitLabeledScope(Option(ctx.beginLabel()))
 
     WhileStatement(condition, body, Some(labelText))
@@ -283,7 +307,8 @@ class AstBuilder extends DataTypeAstBuilder
     })
     val conditionalBodies =
       ctx.conditionalBodies.asScala.toList.map(
-        body => visitCompoundBodyImpl(body, None, allowVarDeclare = false, 
labelCtx)
+        body =>
+          visitCompoundBodyImpl(body, None, allowVarDeclare = false, labelCtx, 
isScope = false)
       )
 
     if (conditions.length != conditionalBodies.length) {
@@ -296,7 +321,8 @@ class AstBuilder extends DataTypeAstBuilder
       conditions = conditions,
       conditionalBodies = conditionalBodies,
       elseBody = Option(ctx.elseBody).map(
-        body => visitCompoundBodyImpl(body, None, allowVarDeclare = false, 
labelCtx)
+        body =>
+          visitCompoundBodyImpl(body, None, allowVarDeclare = false, labelCtx, 
isScope = false)
       ))
   }
 
@@ -313,7 +339,8 @@ class AstBuilder extends DataTypeAstBuilder
     })
     val conditionalBodies =
       ctx.conditionalBodies.asScala.toList.map(
-        body => visitCompoundBodyImpl(body, None, allowVarDeclare = false, 
labelCtx)
+        body =>
+          visitCompoundBodyImpl(body, None, allowVarDeclare = false, labelCtx, 
isScope = false)
       )
 
     if (conditions.length != conditionalBodies.length) {
@@ -326,7 +353,8 @@ class AstBuilder extends DataTypeAstBuilder
       conditions = conditions,
       conditionalBodies = conditionalBodies,
       elseBody = Option(ctx.elseBody).map(
-        body => visitCompoundBodyImpl(body, None, allowVarDeclare = false, 
labelCtx)
+        body =>
+          visitCompoundBodyImpl(body, None, allowVarDeclare = false, labelCtx, 
isScope = false)
       ))
   }
 
@@ -342,7 +370,13 @@ class AstBuilder extends DataTypeAstBuilder
         Project(
           Seq(Alias(expression(boolExpr), "condition")()),
           OneRowRelation()))}
-    val body = visitCompoundBodyImpl(ctx.compoundBody(), None, allowVarDeclare 
= false, labelCtx)
+    val body = visitCompoundBodyImpl(
+      ctx.compoundBody(),
+      None,
+      allowVarDeclare = false,
+      labelCtx,
+      isScope = false
+    )
     labelCtx.exitLabeledScope(Option(ctx.beginLabel()))
 
     RepeatStatement(condition, body, Some(labelText))
@@ -358,7 +392,13 @@ class AstBuilder extends DataTypeAstBuilder
       SingleStatement(visitQuery(queryCtx))
     }
     val varName = Option(ctx.multipartIdentifier()).map(_.getText)
-    val body = visitCompoundBodyImpl(ctx.compoundBody(), None, allowVarDeclare 
= false, labelCtx)
+    val body = visitCompoundBodyImpl(
+      ctx.compoundBody(),
+      None,
+      allowVarDeclare = false,
+      labelCtx,
+      isScope = false
+    )
     labelCtx.exitLabeledScope(Option(ctx.beginLabel()))
 
     ForStatement(query, varName, body, Some(labelText))
@@ -431,7 +471,13 @@ class AstBuilder extends DataTypeAstBuilder
       labelCtx: SqlScriptingLabelContext): LoopStatement = {
     val labelText =
       labelCtx.enterLabeledScope(Option(ctx.beginLabel()), 
Option(ctx.endLabel()))
-    val body = visitCompoundBodyImpl(ctx.compoundBody(), None, allowVarDeclare 
= false, labelCtx)
+    val body = visitCompoundBodyImpl(
+      ctx.compoundBody(),
+      None,
+      allowVarDeclare = false,
+      labelCtx,
+      isScope = false
+    )
     labelCtx.exitLabeledScope(Option(ctx.beginLabel()))
 
     LoopStatement(body, Some(labelText))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
index 4faf1f5d2667..207c586996fd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
@@ -65,13 +65,14 @@ case class SingleStatement(parsedPlan: LogicalPlan)
  */
 case class CompoundBody(
     collection: Seq[CompoundPlanStatement],
-    label: Option[String]) extends Command with CompoundPlanStatement {
+    label: Option[String],
+    isScope: Boolean) extends Command with CompoundPlanStatement {
 
   override def children: Seq[LogicalPlan] = collection
 
   override protected def withNewChildrenInternal(
       newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = {
-    CompoundBody(newChildren.map(_.asInstanceOf[CompoundPlanStatement]), label)
+    CompoundBody(newChildren.map(_.asInstanceOf[CompoundPlanStatement]), 
label, isScope)
   }
 }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
index 59252f622918..71b44cbbd070 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
@@ -35,27 +35,47 @@ class SqlScriptingExecution(
     session: SparkSession,
     args: Map[String, Expression]) extends Iterator[DataFrame] {
 
-  // Build the execution plan for the script.
-  private val executionPlan: Iterator[CompoundStatementExec] =
-    SqlScriptingInterpreter(session).buildExecutionPlan(sqlScript, args)
+  private val interpreter = SqlScriptingInterpreter(session)
 
-  private var current = getNextResult
+  // Frames to keep what is being executed.
+  private val context: SqlScriptingExecutionContext = {
+    val ctx = new SqlScriptingExecutionContext()
+    val executionPlan = interpreter.buildExecutionPlan(sqlScript, args, ctx)
+    // Add frame which represents SQL Script to the context.
+    ctx.frames.addOne(new 
SqlScriptingExecutionFrame(executionPlan.getTreeIterator))
+    // Enter the scope of the top level compound.
+    // We don't need to exit this scope explicitly as it will be done 
automatically
+    // when the frame is removed during iteration.
+    executionPlan.enterScope()
+    ctx
+  }
+
+  private var current: Option[DataFrame] = getNextResult
 
   override def hasNext: Boolean = current.isDefined
 
   override def next(): DataFrame = {
-    if (!hasNext) throw SparkException.internalError("No more elements to 
iterate through.")
-    val nextDataFrame = current.get
-    current = getNextResult
-    nextDataFrame
+    current match {
+      case None => throw SparkException.internalError("No more elements to 
iterate through.")
+      case Some(result) =>
+        current = getNextResult
+        result
+    }
+  }
+
+  /** Helper method to iterate get next statements from the first available 
frame. */
+  private def getNextStatement: Option[CompoundStatementExec] = {
+    while (context.frames.nonEmpty && !context.frames.last.hasNext) {
+      context.frames.remove(context.frames.size - 1)
+    }
+    if (context.frames.nonEmpty) {
+      return Some(context.frames.last.next())
+    }
+    None
   }
 
   /** Helper method to iterate through statements until next result statement 
is encountered. */
   private def getNextResult: Option[DataFrame] = {
-
-    def getNextStatement: Option[CompoundStatementExec] =
-      if (executionPlan.hasNext) Some(executionPlan.next()) else None
-
     var currentStatement = getNextStatement
     // While we don't have a result statement, execute the statements.
     while (currentStatement.isDefined) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
new file mode 100644
index 000000000000..5a2ef62e3bb7
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.scripting
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.SparkException
+
+/**
+ * SQL scripting execution context - keeps track of the current execution 
state.
+ */
+class SqlScriptingExecutionContext {
+  // List of frames that are currently active.
+  val frames: ListBuffer[SqlScriptingExecutionFrame] = ListBuffer.empty
+
+  def enterScope(label: String): Unit = {
+    if (frames.isEmpty) {
+      throw SparkException.internalError("Cannot enter scope: no frames.")
+    }
+    frames.last.enterScope(label)
+  }
+
+  def exitScope(label: String): Unit = {
+    if (frames.isEmpty) {
+      throw SparkException.internalError("Cannot exit scope: no frames.")
+    }
+    frames.last.exitScope(label)
+  }
+}
+
+/**
+ * SQL scripting executor - executes script and returns result statements.
+ * This supports returning multiple result statements from a single script.
+ *
+ * @param executionPlan CompoundBody which need to be executed.
+ */
+class SqlScriptingExecutionFrame(
+    executionPlan: Iterator[CompoundStatementExec]) extends 
Iterator[CompoundStatementExec] {
+
+  // List of scopes that are currently active.
+  private val scopes: ListBuffer[SqlScriptingExecutionScope] = ListBuffer.empty
+
+  override def hasNext: Boolean = executionPlan.hasNext
+
+  override def next(): CompoundStatementExec = {
+    if (!hasNext) throw SparkException.internalError("No more elements to 
iterate through.")
+    executionPlan.next()
+  }
+
+  def enterScope(label: String): Unit = {
+    scopes.addOne(new SqlScriptingExecutionScope(label))
+  }
+
+  def exitScope(label: String): Unit = {
+    if (scopes.isEmpty) {
+      throw SparkException.internalError("Cannot exit scope: no scopes to 
exit.")
+    }
+
+    // Remove all scopes until the one with the given label.
+    while (scopes.nonEmpty && scopes.last.label != label) {
+      scopes.remove(scopes.length - 1)
+    }
+
+    if (scopes.nonEmpty) {
+      scopes.remove(scopes.length - 1)
+    }
+  }
+}
+
+/**
+ * SQL scripting execution scope - keeps track of the current execution scope.
+ *
+ * @param label
+ *   Label of the scope.
+ */
+class SqlScriptingExecutionScope(val label: String)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
index 99719ce19e3d..2d50d37e2cb8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
@@ -115,12 +115,15 @@ trait NonLeafStatementExec extends CompoundStatementExec {
  *   Whether the statement originates from the SQL script or it is created 
during the
  *   interpretation. Example: DropVariable statements are automatically 
created at the end of each
  *   compound.
+ * @param context
+ *   SqlScriptingExecutionContext keeps the execution state of current script.
  */
 class SingleStatementExec(
     var parsedPlan: LogicalPlan,
     override val origin: Origin,
     val args: Map[String, Expression],
-    override val isInternal: Boolean)
+    override val isInternal: Boolean,
+    context: SqlScriptingExecutionContext)
   extends LeafStatementExec with WithOrigin {
 
   /**
@@ -177,12 +180,54 @@ class NoOpStatementExec extends LeafStatementExec {
  *   Executable nodes for nested statements within the CompoundBody.
  * @param label
  *   Label set by user to CompoundBody or None otherwise.
+ * @param isScope
+ *   Flag that indicates whether Compound Body is scope or not.
+ * @param context
+ *   SqlScriptingExecutionContext keeps the execution state of current script.
  */
-class CompoundBodyExec(statements: Seq[CompoundStatementExec], label: 
Option[String] = None)
+class CompoundBodyExec(
+    statements: Seq[CompoundStatementExec],
+    label: Option[String] = None,
+    isScope: Boolean,
+    context: SqlScriptingExecutionContext)
   extends NonLeafStatementExec {
 
+  private object ScopeStatus extends Enumeration {
+    type ScopeStatus = Value
+    val NOT_ENTERED, INSIDE, EXITED = Value
+  }
+
   private var localIterator = statements.iterator
   private var curr = if (localIterator.hasNext) Some(localIterator.next()) 
else None
+  private var scopeStatus = ScopeStatus.NOT_ENTERED
+
+  /**
+   * Enter scope represented by this compound statement.
+   *
+   * This operation needs to be idempotent because it is called multiple times 
during
+   * iteration, but it should be executed only once when compound body that 
represent
+   * scope is encountered for the first time.
+   */
+  def enterScope(): Unit = {
+    // This check makes this operation idempotent.
+    if (isScope && scopeStatus == ScopeStatus.NOT_ENTERED) {
+      scopeStatus = ScopeStatus.INSIDE
+      context.enterScope(label.get)
+    }
+  }
+
+  /**
+   * Exit scope represented by this compound statement.
+   *
+   * Even though this operation is called exactly once, we are making it 
idempotent.
+   */
+  protected def exitScope(): Unit = {
+    // This check makes this operation idempotent.
+    if (isScope && scopeStatus == ScopeStatus.INSIDE) {
+      scopeStatus = ScopeStatus.EXITED
+      context.exitScope(label.get)
+    }
+  }
 
   /** Used to stop the iteration in cases when LEAVE statement is encountered. 
*/
   private var stopIteration = false
@@ -218,6 +263,11 @@ class CompoundBodyExec(statements: 
Seq[CompoundStatementExec], label: Option[Str
             statement
           case Some(body: NonLeafStatementExec) =>
             if (body.getTreeIterator.hasNext) {
+              body match {
+                // Scope will be entered only once per compound because enter 
scope is idempotent.
+                case compoundBodyExec: CompoundBodyExec => 
compoundBodyExec.enterScope()
+                case _ => // pass
+              }
               body.getTreeIterator.next() match {
                 case leaveStatement: LeaveStatementExec =>
                   handleLeaveStatement(leaveStatement)
@@ -228,6 +278,11 @@ class CompoundBodyExec(statements: 
Seq[CompoundStatementExec], label: Option[Str
                 case other => other
               }
             } else {
+              body match {
+                // Exit scope when there are no more statements to iterate 
through.
+                case compoundBodyExec: CompoundBodyExec => 
compoundBodyExec.exitScope()
+                case _ => // pass
+              }
               curr = if (localIterator.hasNext) Some(localIterator.next()) 
else None
               next()
             }
@@ -244,6 +299,7 @@ class CompoundBodyExec(statements: 
Seq[CompoundStatementExec], label: Option[Str
     localIterator = statements.iterator
     curr = if (localIterator.hasNext) Some(localIterator.next()) else None
     stopIteration = false
+    scopeStatus = ScopeStatus.NOT_ENTERED
   }
 
   /** Actions to do when LEAVE statement is encountered, to stop the execution 
of this compound. */
@@ -252,6 +308,9 @@ class CompoundBodyExec(statements: 
Seq[CompoundStatementExec], label: Option[Str
       // Stop the iteration.
       stopIteration = true
 
+      // Exit scope if leave statement is encountered.
+      exitScope()
+
       // TODO: Variable cleanup (once we add SQL script execution logic).
       // TODO: Add interpreter tests as well.
 
@@ -268,6 +327,9 @@ class CompoundBodyExec(statements: 
Seq[CompoundStatementExec], label: Option[Str
       // Stop the iteration.
       stopIteration = true
 
+      // Exit scope if iterate statement is encountered.
+      exitScope()
+
       // TODO: Variable cleanup (once we add SQL script execution logic).
       // TODO: Add interpreter tests as well.
 
@@ -680,13 +742,15 @@ class LoopStatementExec(
  * @param body Executable node for the body.
  * @param label Label set to ForStatement by user or None otherwise.
  * @param session Spark session that SQL script is executed within.
+ * @param context SqlScriptingExecutionContext keeps the execution state of 
current script.
  */
 class ForStatementExec(
     query: SingleStatementExec,
     variableName: Option[String],
     body: CompoundBodyExec,
     val label: Option[String],
-    session: SparkSession) extends NonLeafStatementExec {
+    session: SparkSession,
+    context: SqlScriptingExecutionContext) extends NonLeafStatementExec {
 
   private object ForState extends Enumeration {
     val VariableAssignment, Body, VariableCleanup = Value
@@ -848,7 +912,10 @@ class ForStatementExec(
     else {
       // create compound body for dropping nodes after execution is complete
       dropVariablesExec = new CompoundBodyExec(
-        variablesMap.keys.toSeq.map(colName => createDropVarExec(colName))
+        variablesMap.keys.toSeq.map(colName => createDropVarExec(colName)),
+        None,
+        isScope = false,
+        context
       )
       ForState.VariableCleanup
     }
@@ -861,7 +928,7 @@ class ForStatementExec(
       defaultExpression,
       replace = true
     )
-    new SingleStatementExec(declareVariable, Origin(), Map.empty, isInternal = 
true)
+    new SingleStatementExec(declareVariable, Origin(), Map.empty, isInternal = 
true, context)
   }
 
   private def createSetVarExec(varName: String, variable: Expression): 
SingleStatementExec = {
@@ -871,12 +938,17 @@ class ForStatementExec(
     )
     val setIdentifierToCurrentRow =
       SetVariable(Seq(UnresolvedAttribute(varName)), projectNamedStruct)
-    new SingleStatementExec(setIdentifierToCurrentRow, Origin(), Map.empty, 
isInternal = true)
+    new SingleStatementExec(
+      setIdentifierToCurrentRow,
+      Origin(),
+      Map.empty,
+      isInternal = true,
+      context)
   }
 
   private def createDropVarExec(varName: String): SingleStatementExec = {
     val dropVar = DropVariable(UnresolvedIdentifier(Seq(varName)), ifExists = 
true)
-    new SingleStatementExec(dropVar, Origin(), Map.empty, isInternal = true)
+    new SingleStatementExec(dropVar, Origin(), Map.empty, isInternal = true, 
context)
   }
 
   override def getTreeIterator: Iterator[CompoundStatementExec] = treeIterator
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreter.scala
index 5d3edeefc532..7d00bbb3538d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreter.scala
@@ -40,13 +40,14 @@ case class SqlScriptingInterpreter(session: SparkSession) {
    * @param args
    *   A map of parameter names to SQL literal expressions.
    * @return
-   *   Iterator through collection of statements to be executed.
+   *   Top level CompoundBodyExec representing SQL Script to be executed.
    */
   def buildExecutionPlan(
       compound: CompoundBody,
-      args: Map[String, Expression]): Iterator[CompoundStatementExec] = {
-    transformTreeIntoExecutable(compound, args)
-      .asInstanceOf[CompoundBodyExec].getTreeIterator
+      args: Map[String, Expression],
+      context: SqlScriptingExecutionContext): CompoundBodyExec = {
+    transformTreeIntoExecutable(compound, args, context)
+      .asInstanceOf[CompoundBodyExec]
   }
 
   /**
@@ -74,9 +75,10 @@ case class SqlScriptingInterpreter(session: SparkSession) {
    */
   private def transformTreeIntoExecutable(
       node: CompoundPlanStatement,
-      args: Map[String, Expression]): CompoundStatementExec =
+      args: Map[String, Expression],
+      context: SqlScriptingExecutionContext): CompoundStatementExec =
     node match {
-      case CompoundBody(collection, label) =>
+      case CompoundBody(collection, label, isScope) =>
         // TODO [SPARK-48530]: Current logic doesn't support scoped variables 
and shadowing.
         val variables = collection.flatMap {
           case st: SingleStatement => getDeclareVarNameFromPlan(st.parsedPlan)
@@ -84,16 +86,20 @@ case class SqlScriptingInterpreter(session: SparkSession) {
         }
         val dropVariables = variables
           .map(varName => DropVariable(varName, ifExists = true))
-          .map(new SingleStatementExec(_, Origin(), args, isInternal = true))
+          .map(new SingleStatementExec(_, Origin(), args, isInternal = true, 
context))
           .reverse
 
-        val statements =
-          collection.map(st => transformTreeIntoExecutable(st, args)) ++ 
dropVariables match {
+        val statements = collection
+          .map(st => transformTreeIntoExecutable(st, args, context)) ++ 
dropVariables match {
             case Nil => Seq(new NoOpStatementExec)
             case s => s
           }
 
-        new CompoundBodyExec(statements, label)
+        new CompoundBodyExec(
+          statements,
+          label,
+          isScope,
+          context)
 
       case IfElseStatement(conditions, conditionalBodies, elseBody) =>
         val conditionsExec = conditions.map(condition =>
@@ -101,11 +107,12 @@ case class SqlScriptingInterpreter(session: SparkSession) 
{
             condition.parsedPlan,
             condition.origin,
             args,
-            isInternal = false))
+            isInternal = false,
+            context))
         val conditionalBodiesExec = conditionalBodies.map(body =>
-          transformTreeIntoExecutable(body, 
args).asInstanceOf[CompoundBodyExec])
+          transformTreeIntoExecutable(body, args, 
context).asInstanceOf[CompoundBodyExec])
         val unconditionalBodiesExec = elseBody.map(body =>
-          transformTreeIntoExecutable(body, 
args).asInstanceOf[CompoundBodyExec])
+          transformTreeIntoExecutable(body, args, 
context).asInstanceOf[CompoundBodyExec])
         new IfElseStatementExec(
           conditionsExec, conditionalBodiesExec, unconditionalBodiesExec, 
session)
 
@@ -115,11 +122,12 @@ case class SqlScriptingInterpreter(session: SparkSession) 
{
             condition.parsedPlan,
             condition.origin,
             args,
-            isInternal = false))
+            isInternal = false,
+            context))
         val conditionalBodiesExec = conditionalBodies.map(body =>
-          transformTreeIntoExecutable(body, 
args).asInstanceOf[CompoundBodyExec])
+          transformTreeIntoExecutable(body, args, 
context).asInstanceOf[CompoundBodyExec])
         val unconditionalBodiesExec = elseBody.map(body =>
-          transformTreeIntoExecutable(body, 
args).asInstanceOf[CompoundBodyExec])
+          transformTreeIntoExecutable(body, args, 
context).asInstanceOf[CompoundBodyExec])
         new CaseStatementExec(
           conditionsExec, conditionalBodiesExec, unconditionalBodiesExec, 
session)
 
@@ -129,9 +137,10 @@ case class SqlScriptingInterpreter(session: SparkSession) {
             condition.parsedPlan,
             condition.origin,
             args,
-            isInternal = false)
+            isInternal = false,
+            context)
         val bodyExec =
-          transformTreeIntoExecutable(body, 
args).asInstanceOf[CompoundBodyExec]
+          transformTreeIntoExecutable(body, args, 
context).asInstanceOf[CompoundBodyExec]
         new WhileStatementExec(conditionExec, bodyExec, label, session)
 
       case RepeatStatement(condition, body, label) =>
@@ -140,13 +149,14 @@ case class SqlScriptingInterpreter(session: SparkSession) 
{
             condition.parsedPlan,
             condition.origin,
             args,
-            isInternal = false)
+            isInternal = false,
+            context)
         val bodyExec =
-          transformTreeIntoExecutable(body, 
args).asInstanceOf[CompoundBodyExec]
+          transformTreeIntoExecutable(body, args, 
context).asInstanceOf[CompoundBodyExec]
         new RepeatStatementExec(conditionExec, bodyExec, label, session)
 
       case LoopStatement(body, label) =>
-        val bodyExec = transformTreeIntoExecutable(body, args)
+        val bodyExec = transformTreeIntoExecutable(body, args, context)
           .asInstanceOf[CompoundBodyExec]
         new LoopStatementExec(bodyExec, label)
 
@@ -156,10 +166,11 @@ case class SqlScriptingInterpreter(session: SparkSession) 
{
             query.parsedPlan,
             query.origin,
             args,
-            isInternal = false)
+            isInternal = false,
+            context)
         val bodyExec =
-          transformTreeIntoExecutable(body, 
args).asInstanceOf[CompoundBodyExec]
-        new ForStatementExec(queryExec, variableNameOpt, bodyExec, label, 
session)
+          transformTreeIntoExecutable(body, args, 
context).asInstanceOf[CompoundBodyExec]
+        new ForStatementExec(queryExec, variableNameOpt, bodyExec, label, 
session, context)
 
       case leaveStatement: LeaveStatement =>
         new LeaveStatementExec(leaveStatement.label)
@@ -172,6 +183,7 @@ case class SqlScriptingInterpreter(session: SparkSession) {
           sparkStatement.parsedPlan,
           sparkStatement.origin,
           args,
-          isInternal = false)
+          isInternal = false,
+          context)
     }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNodeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNodeSuite.scala
index a997b5beadd3..325c8ce380c6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNodeSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNodeSuite.scala
@@ -32,6 +32,35 @@ import org.apache.spark.sql.types.{IntegerType, StructField, 
StructType}
  */
 class SqlScriptingExecutionNodeSuite extends SparkFunSuite with 
SharedSparkSession {
   // Helpers
+  case class TestCompoundBody(
+      statements: Seq[CompoundStatementExec],
+      label: Option[String] = None,
+      isScope: Boolean = false,
+      context: SqlScriptingExecutionContext = null)
+    extends CompoundBodyExec(statements, label, isScope, context) {
+
+    // No-op to remove unnecessary logic for these tests.
+    override def enterScope(): Unit = ()
+
+    // No-op to remove unnecessary logic for these tests.
+    override def exitScope(): Unit = ()
+  }
+
+  case class TestForStatement(
+      query: SingleStatementExec,
+      variableName: Option[String],
+      body: CompoundBodyExec,
+      override val label: Option[String],
+      session: SparkSession,
+      context: SqlScriptingExecutionContext = null)
+    extends ForStatementExec(
+      query,
+      variableName,
+      body,
+      label,
+      session,
+      context)
+
   case class TestLeafStatement(testVal: String) extends LeafStatementExec {
     override def reset(): Unit = ()
   }
@@ -41,7 +70,9 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite 
with SharedSparkSessi
       parsedPlan = Project(Seq(Alias(Literal(condVal), description)()), 
OneRowRelation()),
       Origin(startIndex = Some(0), stopIndex = Some(description.length)),
       Map.empty,
-      isInternal = false)
+      isInternal = false,
+      null
+    )
 
   case class DummyLogicalPlan() extends LeafNode {
     override def output: Seq[Attribute] = Seq.empty
@@ -53,7 +84,9 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite 
with SharedSparkSessi
       parsedPlan = DummyLogicalPlan(),
       Origin(startIndex = Some(0), stopIndex = Some(description.length)),
       Map.empty,
-      isInternal = false)
+      isInternal = false,
+      null
+    )
 
   class LoopBooleanConditionEvaluator(condition: TestLoopCondition) {
     private var callCount: Int = 0
@@ -71,7 +104,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite 
with SharedSparkSessi
 
   case class TestWhile(
       condition: TestLoopCondition,
-      body: CompoundBodyExec,
+      body: TestCompoundBody,
       label: Option[String] = None)
     extends WhileStatementExec(condition, body, label, spark) {
 
@@ -84,7 +117,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite 
with SharedSparkSessi
 
   case class TestRepeat(
       condition: TestLoopCondition,
-      body: CompoundBodyExec,
+      body: TestCompoundBody,
       label: Option[String] = None)
     extends RepeatStatementExec(condition, body, label, spark) {
 
@@ -100,7 +133,8 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite 
with SharedSparkSessi
         DummyLogicalPlan(),
         Origin(startIndex = Some(0), stopIndex = Some(description.length)),
         Map.empty,
-        isInternal = false) {
+        isInternal = false,
+        null) {
     override def buildDataFrame(session: SparkSession): DataFrame = {
       val data = Seq.range(0, numberOfRows).map(Row(_))
       val schema = List(StructField(columnName, IntegerType))
@@ -120,7 +154,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite 
with SharedSparkSessi
       case loopStmt: LoopStatementExec => loopStmt.label.get
       case leaveStmt: LeaveStatementExec => leaveStmt.label
       case iterateStmt: IterateStatementExec => iterateStmt.label
-      case forStmt: ForStatementExec => forStmt.label.get
+      case forStmt: TestForStatement => forStmt.label.get
       case dropStmt: SingleStatementExec if 
dropStmt.parsedPlan.isInstanceOf[DropVariable]
         => "DropVariable"
       case _ => fail("Unexpected statement type")
@@ -128,13 +162,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
 
   // Tests
   test("test body - single statement") {
-    val iter = new 
CompoundBodyExec(Seq(TestLeafStatement("one"))).getTreeIterator
+    val iter = TestCompoundBody(Seq(TestLeafStatement("one"))).getTreeIterator
     val statements = iter.map(extractStatementValue).toSeq
     assert(statements === Seq("one"))
   }
 
   test("test body - no nesting") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       Seq(
         TestLeafStatement("one"),
         TestLeafStatement("two"),
@@ -145,26 +179,26 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("test body - nesting") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       Seq(
-        new CompoundBodyExec(Seq(TestLeafStatement("one"), 
TestLeafStatement("two"))),
+        TestCompoundBody(Seq(TestLeafStatement("one"), 
TestLeafStatement("two"))),
         TestLeafStatement("three"),
-        new CompoundBodyExec(Seq(TestLeafStatement("four"), 
TestLeafStatement("five")))))
+        TestCompoundBody(Seq(TestLeafStatement("four"), 
TestLeafStatement("five")))))
       .getTreeIterator
     val statements = iter.map(extractStatementValue).toSeq
     assert(statements === Seq("one", "two", "three", "four", "five"))
   }
 
   test("if else - enter body of the IF clause") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new IfElseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = true, description = "con1")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1")))
+          TestCompoundBody(Seq(TestLeafStatement("body1")))
         ),
-        elseBody = Some(new CompoundBodyExec(Seq(TestLeafStatement("body2")))),
+        elseBody = Some(TestCompoundBody(Seq(TestLeafStatement("body2")))),
         session = spark
       )
     )).getTreeIterator
@@ -173,15 +207,15 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("if else - enter body of the ELSE clause") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new IfElseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = false, description = "con1")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1")))
+          TestCompoundBody(Seq(TestLeafStatement("body1")))
         ),
-        elseBody = Some(new CompoundBodyExec(Seq(TestLeafStatement("body2")))),
+        elseBody = Some(TestCompoundBody(Seq(TestLeafStatement("body2")))),
         session = spark
       )
     )).getTreeIterator
@@ -190,17 +224,17 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("if else if - enter body of the IF clause") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new IfElseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = true, description = "con1"),
           TestIfElseCondition(condVal = false, description = "con2")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
-          new CompoundBodyExec(Seq(TestLeafStatement("body2")))
+          TestCompoundBody(Seq(TestLeafStatement("body1"))),
+          TestCompoundBody(Seq(TestLeafStatement("body2")))
         ),
-        elseBody = Some(new CompoundBodyExec(Seq(TestLeafStatement("body3")))),
+        elseBody = Some(TestCompoundBody(Seq(TestLeafStatement("body3")))),
         session = spark
       )
     )).getTreeIterator
@@ -209,17 +243,17 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("if else if - enter body of the ELSE IF clause") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new IfElseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = false, description = "con1"),
           TestIfElseCondition(condVal = true, description = "con2")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
-          new CompoundBodyExec(Seq(TestLeafStatement("body2")))
+          TestCompoundBody(Seq(TestLeafStatement("body1"))),
+          TestCompoundBody(Seq(TestLeafStatement("body2")))
         ),
-        elseBody = Some(new CompoundBodyExec(Seq(TestLeafStatement("body3")))),
+        elseBody = Some(TestCompoundBody(Seq(TestLeafStatement("body3")))),
         session = spark
       )
     )).getTreeIterator
@@ -228,7 +262,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite 
with SharedSparkSessi
   }
 
   test("if else if - enter body of the second ELSE IF clause") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new IfElseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = false, description = "con1"),
@@ -236,11 +270,11 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
           TestIfElseCondition(condVal = true, description = "con3")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
-          new CompoundBodyExec(Seq(TestLeafStatement("body2"))),
-          new CompoundBodyExec(Seq(TestLeafStatement("body3")))
+          TestCompoundBody(Seq(TestLeafStatement("body1"))),
+          TestCompoundBody(Seq(TestLeafStatement("body2"))),
+          TestCompoundBody(Seq(TestLeafStatement("body3")))
         ),
-        elseBody = Some(new CompoundBodyExec(Seq(TestLeafStatement("body4")))),
+        elseBody = Some(TestCompoundBody(Seq(TestLeafStatement("body4")))),
         session = spark
       )
     )).getTreeIterator
@@ -249,17 +283,17 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("if else if - enter body of the ELSE clause") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new IfElseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = false, description = "con1"),
           TestIfElseCondition(condVal = false, description = "con2")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
-          new CompoundBodyExec(Seq(TestLeafStatement("body2")))
+          TestCompoundBody(Seq(TestLeafStatement("body1"))),
+          TestCompoundBody(Seq(TestLeafStatement("body2")))
         ),
-        elseBody = Some(new CompoundBodyExec(Seq(TestLeafStatement("body3")))),
+        elseBody = Some(TestCompoundBody(Seq(TestLeafStatement("body3")))),
         session = spark
       )
     )).getTreeIterator
@@ -268,15 +302,15 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("if else if - without else (successful check)") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new IfElseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = false, description = "con1"),
           TestIfElseCondition(condVal = true, description = "con2")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
-          new CompoundBodyExec(Seq(TestLeafStatement("body2")))
+          TestCompoundBody(Seq(TestLeafStatement("body1"))),
+          TestCompoundBody(Seq(TestLeafStatement("body2")))
         ),
         elseBody = None,
         session = spark
@@ -287,15 +321,15 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("if else if - without else (unsuccessful checks)") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new IfElseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = false, description = "con1"),
           TestIfElseCondition(condVal = false, description = "con2")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
-          new CompoundBodyExec(Seq(TestLeafStatement("body2")))
+          TestCompoundBody(Seq(TestLeafStatement("body1"))),
+          TestCompoundBody(Seq(TestLeafStatement("body2")))
         ),
         elseBody = None,
         session = spark
@@ -306,10 +340,10 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("while - doesn't enter body") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       TestWhile(
         condition = TestLoopCondition(condVal = true, reps = 0, description = 
"con1"),
-        body = new CompoundBodyExec(Seq(TestLeafStatement("body1")))
+        body = TestCompoundBody(Seq(TestLeafStatement("body1")))
       )
     )).getTreeIterator
     val statements = iter.map(extractStatementValue).toSeq
@@ -317,10 +351,10 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("while - enters body once") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       TestWhile(
         condition = TestLoopCondition(condVal = true, reps = 1, description = 
"con1"),
-        body = new CompoundBodyExec(Seq(TestLeafStatement("body1")))
+        body = TestCompoundBody(Seq(TestLeafStatement("body1")))
       )
     )).getTreeIterator
     val statements = iter.map(extractStatementValue).toSeq
@@ -328,10 +362,10 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("while - enters body with multiple statements multiple times") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       TestWhile(
         condition = TestLoopCondition(condVal = true, reps = 2, description = 
"con1"),
-        body = new CompoundBodyExec(Seq(
+        body = TestCompoundBody(Seq(
           TestLeafStatement("statement1"),
           TestLeafStatement("statement2")))
       )
@@ -342,13 +376,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("nested while - 2 times outer 2 times inner") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       TestWhile(
         condition = TestLoopCondition(condVal = true, reps = 2, description = 
"con1"),
-        body = new CompoundBodyExec(Seq(
+        body = TestCompoundBody(Seq(
           TestWhile(
             condition = TestLoopCondition(condVal = true, reps = 2, 
description = "con2"),
-            body = new CompoundBodyExec(Seq(TestLeafStatement("body1")))
+            body = TestCompoundBody(Seq(TestLeafStatement("body1")))
           ))
         )
       )
@@ -361,10 +395,10 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("repeat - true condition") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       TestRepeat(
         condition = TestLoopCondition(condVal = false, reps = 0, description = 
"con1"),
-        body = new CompoundBodyExec(Seq(TestLeafStatement("body1")))
+        body = TestCompoundBody(Seq(TestLeafStatement("body1")))
       )
     )).getTreeIterator
     val statements = iter.map(extractStatementValue).toSeq
@@ -372,10 +406,10 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("repeat - condition false once") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       TestRepeat(
         condition = TestLoopCondition(condVal = false, reps = 1, description = 
"con1"),
-        body = new CompoundBodyExec(Seq(TestLeafStatement("body1")))
+        body = TestCompoundBody(Seq(TestLeafStatement("body1")))
       )
     )).getTreeIterator
     val statements = iter.map(extractStatementValue).toSeq
@@ -383,10 +417,10 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("repeat - enters body with multiple statements multiple times") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       TestRepeat(
         condition = TestLoopCondition(condVal = false, reps = 2, description = 
"con1"),
-        body = new CompoundBodyExec(Seq(
+        body = TestCompoundBody(Seq(
           TestLeafStatement("statement1"),
           TestLeafStatement("statement2")))
       )
@@ -397,13 +431,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("nested repeat") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       TestRepeat(
         condition = TestLoopCondition(condVal = false, reps = 2, description = 
"con1"),
-        body = new CompoundBodyExec(Seq(
+        body = TestCompoundBody(Seq(
           TestRepeat(
             condition = TestLoopCondition(condVal = false, reps = 2, 
description = "con2"),
-            body = new CompoundBodyExec(Seq(TestLeafStatement("body1")))
+            body = TestCompoundBody(Seq(TestLeafStatement("body1")))
           ))
         )
       )
@@ -419,7 +453,7 @@ class SqlScriptingExecutionNodeSuite extends SparkFunSuite 
with SharedSparkSessi
   }
 
   test("leave compound block") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       statements = Seq(
         TestLeafStatement("one"),
         new LeaveStatementExec("lbl")
@@ -431,11 +465,11 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("leave while loop") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       statements = Seq(
         TestWhile(
           condition = TestLoopCondition(condVal = true, reps = 2, description 
= "con1"),
-          body = new CompoundBodyExec(Seq(
+          body = TestCompoundBody(Seq(
             TestLeafStatement("body1"),
             new LeaveStatementExec("lbl"))
           ),
@@ -448,11 +482,11 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("leave repeat loop") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       statements = Seq(
         TestRepeat(
           condition = TestLoopCondition(condVal = false, reps = 2, description 
= "con1"),
-          body = new CompoundBodyExec(Seq(
+          body = TestCompoundBody(Seq(
             TestLeafStatement("body1"),
             new LeaveStatementExec("lbl"))
           ),
@@ -465,11 +499,11 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("iterate while loop") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       statements = Seq(
         TestWhile(
           condition = TestLoopCondition(condVal = true, reps = 2, description 
= "con1"),
-          body = new CompoundBodyExec(Seq(
+          body = TestCompoundBody(Seq(
             TestLeafStatement("body1"),
             new IterateStatementExec("lbl"),
             TestLeafStatement("body2"))
@@ -483,11 +517,11 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("iterate repeat loop") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       statements = Seq(
         TestRepeat(
           condition = TestLoopCondition(condVal = false, reps = 2, description 
= "con1"),
-          body = new CompoundBodyExec(Seq(
+          body = TestCompoundBody(Seq(
             TestLeafStatement("body1"),
             new IterateStatementExec("lbl"),
             TestLeafStatement("body2"))
@@ -502,14 +536,14 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("leave outer loop from nested while loop") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       statements = Seq(
         TestWhile(
           condition = TestLoopCondition(condVal = true, reps = 2, description 
= "con1"),
-          body = new CompoundBodyExec(Seq(
+          body = TestCompoundBody(Seq(
             TestWhile(
               condition = TestLoopCondition(condVal = true, reps = 2, 
description = "con2"),
-              body = new CompoundBodyExec(Seq(
+              body = TestCompoundBody(Seq(
                 TestLeafStatement("body1"),
                 new LeaveStatementExec("lbl"))
               ),
@@ -525,14 +559,14 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("leave outer loop from nested repeat loop") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       statements = Seq(
         TestRepeat(
           condition = TestLoopCondition(condVal = false, reps = 2, description 
= "con1"),
-          body = new CompoundBodyExec(Seq(
+          body = TestCompoundBody(Seq(
             TestRepeat(
               condition = TestLoopCondition(condVal = false, reps = 2, 
description = "con2"),
-              body = new CompoundBodyExec(Seq(
+              body = TestCompoundBody(Seq(
                 TestLeafStatement("body1"),
                 new LeaveStatementExec("lbl"))
               ),
@@ -548,14 +582,14 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("iterate outer loop from nested while loop") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       statements = Seq(
         TestWhile(
           condition = TestLoopCondition(condVal = true, reps = 2, description 
= "con1"),
-          body = new CompoundBodyExec(Seq(
+          body = TestCompoundBody(Seq(
             TestWhile(
               condition = TestLoopCondition(condVal = true, reps = 2, 
description = "con2"),
-              body = new CompoundBodyExec(Seq(
+              body = TestCompoundBody(Seq(
                 TestLeafStatement("body1"),
                 new IterateStatementExec("lbl"),
                 TestLeafStatement("body2"))
@@ -575,14 +609,14 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("iterate outer loop from nested repeat loop") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       statements = Seq(
         TestRepeat(
           condition = TestLoopCondition(condVal = false, reps = 2, description 
= "con1"),
-          body = new CompoundBodyExec(Seq(
+          body = TestCompoundBody(Seq(
             TestRepeat(
               condition = TestLoopCondition(condVal = false, reps = 2, 
description = "con2"),
-              body = new CompoundBodyExec(Seq(
+              body = TestCompoundBody(Seq(
                 TestLeafStatement("body1"),
                 new IterateStatementExec("lbl"),
                 TestLeafStatement("body2"))
@@ -602,17 +636,17 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("searched case - enter first WHEN clause") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new CaseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = true, description = "con1"),
           TestIfElseCondition(condVal = false, description = "con2")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
-          new CompoundBodyExec(Seq(TestLeafStatement("body2")))
+          TestCompoundBody(Seq(TestLeafStatement("body1"))),
+          TestCompoundBody(Seq(TestLeafStatement("body2")))
         ),
-        elseBody = Some(new CompoundBodyExec(Seq(TestLeafStatement("body3")))),
+        elseBody = Some(TestCompoundBody(Seq(TestLeafStatement("body3")))),
         session = spark
       )
     )).getTreeIterator
@@ -621,15 +655,15 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("searched case - enter body of the ELSE clause") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new CaseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = false, description = "con1")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1")))
+          TestCompoundBody(Seq(TestLeafStatement("body1")))
         ),
-        elseBody = Some(new CompoundBodyExec(Seq(TestLeafStatement("body2")))),
+        elseBody = Some(TestCompoundBody(Seq(TestLeafStatement("body2")))),
         session = spark
       )
     )).getTreeIterator
@@ -638,17 +672,17 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("searched case - enter second WHEN clause") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new CaseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = false, description = "con1"),
           TestIfElseCondition(condVal = true, description = "con2")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
-          new CompoundBodyExec(Seq(TestLeafStatement("body2")))
+          TestCompoundBody(Seq(TestLeafStatement("body1"))),
+          TestCompoundBody(Seq(TestLeafStatement("body2")))
         ),
-        elseBody = Some(new CompoundBodyExec(Seq(TestLeafStatement("body3")))),
+        elseBody = Some(TestCompoundBody(Seq(TestLeafStatement("body3")))),
         session = spark
       )
     )).getTreeIterator
@@ -657,15 +691,15 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("searched case - without else (successful check)") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new CaseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = false, description = "con1"),
           TestIfElseCondition(condVal = true, description = "con2")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
-          new CompoundBodyExec(Seq(TestLeafStatement("body2")))
+          TestCompoundBody(Seq(TestLeafStatement("body1"))),
+          TestCompoundBody(Seq(TestLeafStatement("body2")))
         ),
         elseBody = None,
         session = spark
@@ -676,15 +710,15 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("searched case - without else (unsuccessful checks)") {
-    val iter = new CompoundBodyExec(Seq(
+    val iter = TestCompoundBody(Seq(
       new CaseStatementExec(
         conditions = Seq(
           TestIfElseCondition(condVal = false, description = "con1"),
           TestIfElseCondition(condVal = false, description = "con2")
         ),
         conditionalBodies = Seq(
-          new CompoundBodyExec(Seq(TestLeafStatement("body1"))),
-          new CompoundBodyExec(Seq(TestLeafStatement("body2")))
+          TestCompoundBody(Seq(TestLeafStatement("body1"))),
+          TestCompoundBody(Seq(TestLeafStatement("body2")))
         ),
         elseBody = None,
         session = spark
@@ -695,10 +729,10 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("loop statement with leave") {
-    val iter = new CompoundBodyExec(
+    val iter = TestCompoundBody(
       statements = Seq(
         new LoopStatementExec(
-          body = new CompoundBodyExec(Seq(
+          body = TestCompoundBody(Seq(
             TestLeafStatement("body1"),
             new LeaveStatementExec("lbl"))
           ),
@@ -711,13 +745,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement - enters body once") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(1, "intCol", "query1"),
         variableName = Some("x"),
         label = Some("for1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(TestLeafStatement("body")))
+        body = TestCompoundBody(Seq(TestLeafStatement("body")))
       )
     )).getTreeIterator
     val statements = iter.map(extractStatementValue).toSeq
@@ -729,13 +763,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement - enters body with multiple statements multiple times") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = Some("x"),
         label = Some("for1"),
         session = spark,
-        body = new CompoundBodyExec(
+        body = TestCompoundBody(
           Seq(TestLeafStatement("statement1"), TestLeafStatement("statement2"))
         )
       )
@@ -752,13 +786,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement - empty result") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(0, "intCol", "query1"),
         variableName = Some("x"),
         label = Some("for1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(TestLeafStatement("body1")))
+        body = TestCompoundBody(Seq(TestLeafStatement("body1")))
       )
     )).getTreeIterator
     val statements = iter.map(extractStatementValue).toSeq
@@ -766,23 +800,24 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement - nested") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = Some("x"),
         label = Some("for1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(
-          new ForStatementExec(
+        body = TestCompoundBody(Seq(
+          TestForStatement(
             query = MockQuery(2, "intCol1", "query2"),
             variableName = Some("y"),
             label = Some("for2"),
             session = spark,
-            body = new CompoundBodyExec(Seq(TestLeafStatement("body")))
+            body = TestCompoundBody(Seq(TestLeafStatement("body")))
           )
         ))
-      )
-    )).getTreeIterator
+      )),
+      label = Some("lbl")
+    ).getTreeIterator
     val statements = iter.map(extractStatementValue).toSeq
     assert(statements === Seq(
       "body",
@@ -799,13 +834,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement no variable - enters body once") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(1, "intCol", "query1"),
         variableName = None,
         label = Some("for1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(TestLeafStatement("body")))
+        body = TestCompoundBody(Seq(TestLeafStatement("body")))
       )
     )).getTreeIterator
     val statements = iter.map(extractStatementValue).toSeq
@@ -816,13 +851,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement no variable - enters body with multiple statements 
multiple times") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = None,
         label = Some("for1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(
+        body = TestCompoundBody(Seq(
           TestLeafStatement("statement1"),
           TestLeafStatement("statement2")))
       )
@@ -835,13 +870,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement no variable - empty result") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(0, "intCol", "query1"),
         variableName = None,
         label = Some("for1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(TestLeafStatement("body1")))
+        body = TestCompoundBody(Seq(TestLeafStatement("body1")))
       )
     )).getTreeIterator
     val statements = iter.map(extractStatementValue).toSeq
@@ -849,19 +884,19 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement no variable - nested") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = None,
         label = Some("for1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(
-          new ForStatementExec(
+        body = TestCompoundBody(Seq(
+          TestForStatement(
             query = MockQuery(2, "intCol1", "query2"),
             variableName = None,
             label = Some("for2"),
             session = spark,
-            body = new CompoundBodyExec(Seq(TestLeafStatement("body")))
+            body = TestCompoundBody(Seq(TestLeafStatement("body")))
           )
         ))
       )
@@ -877,13 +912,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement - iterate") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = Some("x"),
         label = Some("lbl1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(
+        body = TestCompoundBody(Seq(
           TestLeafStatement("statement1"),
           new IterateStatementExec("lbl1"),
           TestLeafStatement("statement2")))
@@ -901,13 +936,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement - leave") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = Some("x"),
         label = Some("lbl1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(
+        body = TestCompoundBody(Seq(
           TestLeafStatement("statement1"),
           new LeaveStatementExec("lbl1"),
           TestLeafStatement("statement2")))
@@ -918,20 +953,20 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement - nested - iterate outer loop") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = Some("x"),
         label = Some("lbl1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(
+        body = TestCompoundBody(Seq(
           TestLeafStatement("outer_body"),
-          new ForStatementExec(
+          TestForStatement(
             query = MockQuery(2, "intCol1", "query2"),
             variableName = Some("y"),
             label = Some("lbl2"),
             session = spark,
-            body = new CompoundBodyExec(Seq(
+            body = TestCompoundBody(Seq(
               TestLeafStatement("body1"),
               new IterateStatementExec("lbl1"),
               TestLeafStatement("body2")))
@@ -953,19 +988,19 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement - nested - leave outer loop") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = Some("x"),
         label = Some("lbl1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(
-          new ForStatementExec(
+        body = TestCompoundBody(Seq(
+          TestForStatement(
             query = MockQuery(2, "intCol", "query2"),
             variableName = Some("y"),
             label = Some("lbl2"),
             session = spark,
-            body = new CompoundBodyExec(Seq(
+            body = TestCompoundBody(Seq(
               TestLeafStatement("body1"),
               new LeaveStatementExec("lbl1"),
               TestLeafStatement("body2")))
@@ -978,13 +1013,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement no variable - iterate") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = None,
         label = Some("lbl1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(
+        body = TestCompoundBody(Seq(
           TestLeafStatement("statement1"),
           new IterateStatementExec("lbl1"),
           TestLeafStatement("statement2")))
@@ -998,13 +1033,13 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement no variable - leave") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = None,
         label = Some("lbl1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(
+        body = TestCompoundBody(Seq(
           TestLeafStatement("statement1"),
           new LeaveStatementExec("lbl1"),
           TestLeafStatement("statement2")))
@@ -1015,20 +1050,20 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement no variable - nested - iterate outer loop") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = None,
         label = Some("lbl1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(
+        body = TestCompoundBody(Seq(
           TestLeafStatement("outer_body"),
-          new ForStatementExec(
+          TestForStatement(
             query = MockQuery(2, "intCol1", "query2"),
             variableName = None,
             label = Some("lbl2"),
             session = spark,
-            body = new CompoundBodyExec(Seq(
+            body = TestCompoundBody(Seq(
               TestLeafStatement("body1"),
               new IterateStatementExec("lbl1"),
               TestLeafStatement("body2")))
@@ -1044,19 +1079,19 @@ class SqlScriptingExecutionNodeSuite extends 
SparkFunSuite with SharedSparkSessi
   }
 
   test("for statement no variable - nested - leave outer loop") {
-    val iter = new CompoundBodyExec(Seq(
-      new ForStatementExec(
+    val iter = TestCompoundBody(Seq(
+      TestForStatement(
         query = MockQuery(2, "intCol", "query1"),
         variableName = None,
         label = Some("lbl1"),
         session = spark,
-        body = new CompoundBodyExec(Seq(
-          new ForStatementExec(
+        body = TestCompoundBody(Seq(
+          TestForStatement(
             query = MockQuery(2, "intCol1", "query2"),
             variableName = None,
             label = Some("lbl2"),
             session = spark,
-            body = new CompoundBodyExec(Seq(
+            body = TestCompoundBody(Seq(
               TestLeafStatement("body1"),
               new LeaveStatementExec("lbl1"),
               TestLeafStatement("body2")))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
index 2ec42c4554e0..20997504b15e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreterSuite.scala
@@ -45,8 +45,14 @@ class SqlScriptingInterpreterSuite extends QueryTest with 
SharedSparkSession {
       args: Map[String, Expression] = Map.empty): Array[DataFrame] = {
     val interpreter = SqlScriptingInterpreter(spark)
     val compoundBody = 
spark.sessionState.sqlParser.parsePlan(sqlText).asInstanceOf[CompoundBody]
-    val executionPlan = interpreter.buildExecutionPlan(compoundBody, args)
-    executionPlan.flatMap {
+
+    // Initialize context so scopes can be entered correctly.
+    val context = new SqlScriptingExecutionContext()
+    val executionPlan = interpreter.buildExecutionPlan(compoundBody, args, 
context)
+    context.frames.addOne(new 
SqlScriptingExecutionFrame(executionPlan.getTreeIterator))
+    executionPlan.enterScope()
+
+    executionPlan.getTreeIterator.flatMap {
       case statement: SingleStatementExec =>
         if (statement.isExecuted) {
           None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to