This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 16bc416be9 [VL] Prepare shim API for breaking change in SPARK-48610 
(#7445)
16bc416be9 is described below

commit 16bc416be9999f43a329ceaade208c866ad7f2df
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Oct 10 08:04:54 2024 +0800

    [VL] Prepare shim API for breaking change in SPARK-48610 (#7445)
---
 .../spark/sql/execution/GlutenExplainUtils.scala   | 154 +++++++++++----------
 .../spark/sql/execution/GlutenImplicits.scala      |  22 +--
 .../org/apache/gluten/sql/shims/SparkShims.scala   |  15 ++
 .../gluten/sql/shims/spark32/Spark32Shims.scala    |  13 ++
 .../gluten/sql/shims/spark33/Spark33Shims.scala    |  13 ++
 .../gluten/sql/shims/spark34/Spark34Shims.scala    |  13 ++
 .../gluten/sql/shims/spark35/Spark35Shims.scala    |  13 ++
 7 files changed, 160 insertions(+), 83 deletions(-)

diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
index 43b74c8836..fa697789c8 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.execution.WholeStageTransformer
 import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.FallbackTags
+import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.utils.PlanUtil
 
 import org.apache.spark.sql.AnalysisException
@@ -49,7 +50,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
       p: SparkPlan,
       reason: String,
       fallbackNodeToReason: mutable.HashMap[String, String]): Unit = {
-    p.getTagValue(QueryPlan.OP_ID_TAG).foreach {
+    SparkShimLoader.getSparkShims.getOperatorId(p).foreach {
       opId =>
         // e.g., 002 project, it is used to help analysis by `substring(4)`
         val formattedNodeName = f"$opId%03d ${p.nodeName}"
@@ -150,94 +151,99 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper 
{
   // scalastyle:off
   /**
    * Given a input physical plan, performs the following tasks.
-   *   1. Generates the explain output for the input plan excluding the 
subquery plans.
-   *   2. Generates the explain output for each subquery referenced in the 
plan.
+   *   1. Generates the explain output for the input plan excluding the 
subquery plans. 2. Generates
+   *      the explain output for each subquery referenced in the plan.
    */
+  // scalastyle:on
+  // spotless:on
   def processPlan[T <: QueryPlan[T]](
       plan: T,
       append: String => Unit,
-      collectFallbackFunc: Option[QueryPlan[_] => FallbackInfo] = None): 
FallbackInfo = synchronized {
-    try {
-      // Initialize a reference-unique set of Operators to avoid accdiental 
overwrites and to allow
-      // intentional overwriting of IDs generated in previous AQE iteration
-      val operators = newSetFromMap[QueryPlan[_]](new util.IdentityHashMap())
-      // Initialize an array of ReusedExchanges to help find Adaptively 
Optimized Out
-      // Exchanges as part of SPARK-42753
-      val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec]
+      collectFallbackFunc: Option[QueryPlan[_] => FallbackInfo] = None): 
FallbackInfo =
+    synchronized {
+      SparkShimLoader.getSparkShims.withOperatorIdMap(
+        new java.util.IdentityHashMap[QueryPlan[_], Int]()) {
+        try {
+          // Initialize a reference-unique set of Operators to avoid 
accdiental overwrites and to
+          // allow intentional overwriting of IDs generated in previous AQE 
iteration
+          val operators = newSetFromMap[QueryPlan[_]](new 
util.IdentityHashMap())
+          // Initialize an array of ReusedExchanges to help find Adaptively 
Optimized Out
+          // Exchanges as part of SPARK-42753
+          val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec]
 
-      var currentOperatorID = 0
-      currentOperatorID =
-        generateOperatorIDs(plan, currentOperatorID, operators, 
reusedExchanges, true)
+          var currentOperatorID = 0
+          currentOperatorID =
+            generateOperatorIDs(plan, currentOperatorID, operators, 
reusedExchanges, true)
 
-      val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, 
BaseSubqueryExec)]
-      getSubqueries(plan, subqueries)
+          val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, 
BaseSubqueryExec)]
+          getSubqueries(plan, subqueries)
 
-      currentOperatorID = subqueries.foldLeft(currentOperatorID) {
-        (curId, plan) => generateOperatorIDs(plan._3.child, curId, operators, 
reusedExchanges, true)
-      }
+          currentOperatorID = subqueries.foldLeft(currentOperatorID) {
+            (curId, plan) =>
+              generateOperatorIDs(plan._3.child, curId, operators, 
reusedExchanges, true)
+          }
 
-      // SPARK-42753: Process subtree for a ReusedExchange with unknown child
-      val optimizedOutExchanges = ArrayBuffer.empty[Exchange]
-      reusedExchanges.foreach {
-        reused =>
-          val child = reused.child
-          if (!operators.contains(child)) {
-            optimizedOutExchanges.append(child)
-            currentOperatorID =
-              generateOperatorIDs(child, currentOperatorID, operators, 
reusedExchanges, false)
+          // SPARK-42753: Process subtree for a ReusedExchange with unknown 
child
+          val optimizedOutExchanges = ArrayBuffer.empty[Exchange]
+          reusedExchanges.foreach {
+            reused =>
+              val child = reused.child
+              if (!operators.contains(child)) {
+                optimizedOutExchanges.append(child)
+                currentOperatorID =
+                  generateOperatorIDs(child, currentOperatorID, operators, 
reusedExchanges, false)
+              }
           }
-      }
 
-      val collectedOperators = BitSet.empty
-      processPlanSkippingSubqueries(plan, append, collectedOperators)
+          val collectedOperators = BitSet.empty
+          processPlanSkippingSubqueries(plan, append, collectedOperators)
 
-      var i = 0
-      for (sub <- subqueries) {
-        if (i == 0) {
-          append("\n===== Subqueries =====\n\n")
-        }
-        i = i + 1
-        append(
-          s"Subquery:$i Hosting operator id = " +
-            s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n")
+          var i = 0
+          for (sub <- subqueries) {
+            if (i == 0) {
+              append("\n===== Subqueries =====\n\n")
+            }
+            i = i + 1
+            append(
+              s"Subquery:$i Hosting operator id = " +
+                s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n")
 
-        // For each subquery expression in the parent plan, process its child 
plan to compute
-        // the explain output. In case of subquery reuse, we don't print 
subquery plan more
-        // than once. So we skip [[ReusedSubqueryExec]] here.
-        if (!sub._3.isInstanceOf[ReusedSubqueryExec]) {
-          processPlanSkippingSubqueries(sub._3.child, append, 
collectedOperators)
-        }
-        append("\n")
-      }
+            // For each subquery expression in the parent plan, process its 
child plan to compute
+            // the explain output. In case of subquery reuse, we don't print 
subquery plan more
+            // than once. So we skip [[ReusedSubqueryExec]] here.
+            if (!sub._3.isInstanceOf[ReusedSubqueryExec]) {
+              processPlanSkippingSubqueries(sub._3.child, append, 
collectedOperators)
+            }
+            append("\n")
+          }
 
-      i = 0
-      optimizedOutExchanges.foreach {
-        exchange =>
-          if (i == 0) {
-            append("\n===== Adaptively Optimized Out Exchanges =====\n\n")
+          i = 0
+          optimizedOutExchanges.foreach {
+            exchange =>
+              if (i == 0) {
+                append("\n===== Adaptively Optimized Out Exchanges =====\n\n")
+              }
+              i = i + 1
+              append(s"Subplan:$i\n")
+              processPlanSkippingSubqueries[SparkPlan](exchange, append, 
collectedOperators)
+              append("\n")
           }
-          i = i + 1
-          append(s"Subplan:$i\n")
-          processPlanSkippingSubqueries[SparkPlan](exchange, append, 
collectedOperators)
-          append("\n")
-      }
 
-      
(subqueries.filter(!_._3.isInstanceOf[ReusedSubqueryExec]).map(_._3.child) :+ 
plan)
-        .map {
-          plan =>
-            if (collectFallbackFunc.isEmpty) {
-              collectFallbackNodes(plan)
-            } else {
-              collectFallbackFunc.get.apply(plan)
+          
(subqueries.filter(!_._3.isInstanceOf[ReusedSubqueryExec]).map(_._3.child) :+ 
plan)
+            .map {
+              plan =>
+                if (collectFallbackFunc.isEmpty) {
+                  collectFallbackNodes(plan)
+                } else {
+                  collectFallbackFunc.get.apply(plan)
+                }
             }
+            .reduce((a, b) => (a._1 + b._1, a._2 ++ b._2))
+        } finally {
+          removeTags(plan)
         }
-        .reduce((a, b) => (a._1 + b._1, a._2 ++ b._2))
-    } finally {
-      removeTags(plan)
+      }
     }
-  }
-  // scalastyle:on
-  // spotless:on
 
   /**
    * Traverses the supplied input plan in a bottom-up fashion and records the 
operator id via
@@ -288,7 +294,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
       }
       visited.add(plan)
       currentOperationID += 1
-      plan.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
+      SparkShimLoader.getSparkShims.setOperatorId(plan, currentOperationID)
     }
 
     plan.foreachUp {
@@ -358,12 +364,12 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper 
{
    * value.
    */
   private def getOpId(plan: QueryPlan[_]): String = {
-    plan.getTagValue(QueryPlan.OP_ID_TAG).map(v => s"$v").getOrElse("unknown")
+    SparkShimLoader.getSparkShims.getOperatorId(plan).map(v => 
s"$v").getOrElse("unknown")
   }
 
   private def removeTags(plan: QueryPlan[_]): Unit = {
     def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
-      p.unsetTagValue(QueryPlan.OP_ID_TAG)
+      SparkShimLoader.getSparkShims.unsetOperatorId(p)
       children.foreach(removeTags)
     }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index 7f5f921501..4ecc674d4b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
 import org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopLeaf
-import org.apache.spark.sql.execution.GlutenExplainUtils._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AQEShuffleReadExec, QueryStageExec}
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.command.{DataWritingCommandExec, 
ExecutedCommandExec}
@@ -42,8 +41,8 @@ import scala.collection.mutable.ArrayBuffer
  * A helper class to get the Gluten fallback summary from a Spark [[Dataset]].
  *
  * Note that, if AQE is enabled, but the query is not materialized, then this 
method will re-plan
- * the query execution with disabled AQE. It is a workaround to get the final 
plan, and it may
- * cause the inconsistent results with a materialized query. However, we have 
no choice.
+ * the query execution with disabled AQE. It is a workaround to get the final 
plan, and it may cause
+ * the inconsistent results with a materialized query. However, we have no 
choice.
  *
  * For example:
  *
@@ -96,7 +95,9 @@ object GlutenImplicits {
     args.substring(index + "isFinalPlan=".length).trim.toBoolean
   }
 
-  private def collectFallbackNodes(spark: SparkSession, plan: QueryPlan[_]): 
FallbackInfo = {
+  private def collectFallbackNodes(
+      spark: SparkSession,
+      plan: QueryPlan[_]): GlutenExplainUtils.FallbackInfo = {
     var numGlutenNodes = 0
     val fallbackNodeToReason = new mutable.HashMap[String, String]
 
@@ -131,7 +132,7 @@ object GlutenImplicits {
                 spark,
                 newSparkPlan
               )
-              processPlan(
+              GlutenExplainUtils.processPlan(
                 newExecutedPlan,
                 new PlanStringConcat().append,
                 Some(plan => collectFallbackNodes(spark, plan)))
@@ -146,12 +147,15 @@ object GlutenImplicits {
           if (PlanUtil.isGlutenTableCache(i)) {
             numGlutenNodes += 1
           } else {
-            addFallbackNodeWithReason(i, "Columnar table cache is disabled", 
fallbackNodeToReason)
+            GlutenExplainUtils.addFallbackNodeWithReason(
+              i,
+              "Columnar table cache is disabled",
+              fallbackNodeToReason)
           }
           collect(i.relation.cachedPlan)
         case _: AQEShuffleReadExec => // Ignore
         case p: SparkPlan =>
-          handleVanillaSparkPlan(p, fallbackNodeToReason)
+          GlutenExplainUtils.handleVanillaSparkPlan(p, fallbackNodeToReason)
           p.innerChildren.foreach(collect)
         case _ =>
       }
@@ -181,10 +185,10 @@ object GlutenImplicits {
           // AQE is not materialized, so the columnar rules are not applied.
           // For this case, We apply columnar rules manually with disable AQE.
           val qe = spark.sessionState.executePlan(logicalPlan, 
CommandExecutionMode.SKIP)
-          processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
+          GlutenExplainUtils.processPlan(qe.executedPlan, concat.append, 
collectFallbackFunc)
         }
       } else {
-        processPlan(plan, concat.append, collectFallbackFunc)
+        GlutenExplainUtils.processPlan(plan, concat.append, 
collectFallbackFunc)
       }
       totalNumGlutenNodes += numGlutenNodes
       totalNumFallbackNodes += fallbackNodeToReason.size
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 7671f236c9..fba6a4a5a4 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.expressions.{Attribute, BinaryExpression, 
Expression}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -270,4 +271,18 @@ trait SparkShims {
   def extractExpressionArrayInsert(arrayInsert: Expression): Seq[Expression] = 
{
     throw new UnsupportedOperationException("ArrayInsert not supported.")
   }
+
+  /** Shim method for usages from GlutenExplainUtils.scala. */
+  def withOperatorIdMap[T](idMap: java.util.Map[QueryPlan[_], Int])(body: => 
T): T = {
+    body
+  }
+
+  /** Shim method for usages from GlutenExplainUtils.scala. */
+  def getOperatorId(plan: QueryPlan[_]): Option[Int]
+
+  /** Shim method for usages from GlutenExplainUtils.scala. */
+  def setOperatorId(plan: QueryPlan[_], opId: Int): Unit
+
+  /** Shim method for usages from GlutenExplainUtils.scala. */
+  def unsetOperatorId(plan: QueryPlan[_]): Unit
 }
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index f62f9031ce..973e675fa9 100644
--- 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BinaryExpression, Expression, InputFileBlockLength, 
InputFileBlockStart, InputFileName}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
HashClusteredDistribution}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -283,4 +284,16 @@ class Spark32Shims extends SparkShims {
     val s = decimalType.scale
     DecimalType(p, if (toScale > s) s else toScale)
   }
+
+  override def getOperatorId(plan: QueryPlan[_]): Option[Int] = {
+    plan.getTagValue(QueryPlan.OP_ID_TAG)
+  }
+
+  override def setOperatorId(plan: QueryPlan[_], opId: Int): Unit = {
+    plan.setTagValue(QueryPlan.OP_ID_TAG, opId)
+  }
+
+  override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
+    plan.unsetTagValue(QueryPlan.OP_ID_TAG)
+  }
 }
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 168b88275c..5cf7c5505a 100644
--- 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate, 
RegrR2, TypedImperativeAggregate}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -364,4 +365,16 @@ class Spark33Shims extends SparkShims {
       RebaseSpec(LegacyBehaviorPolicy.CORRECTED)
     )
   }
+
+  override def getOperatorId(plan: QueryPlan[_]): Option[Int] = {
+    plan.getTagValue(QueryPlan.OP_ID_TAG)
+  }
+
+  override def setOperatorId(plan: QueryPlan[_], opId: Int): Unit = {
+    plan.setTagValue(QueryPlan.OP_ID_TAG, opId)
+  }
+
+  override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
+    plan.unsetTagValue(QueryPlan.OP_ID_TAG)
+  }
 }
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 558d7f60d5..bedad4c017 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, KeyGroupedPartitioning, Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -499,4 +500,16 @@ class Spark34Shims extends SparkShims {
     val expr = arrayInsert.asInstanceOf[ArrayInsert]
     Seq(expr.srcArrayExpr, expr.posExpr, expr.itemExpr, 
Literal(expr.legacyNegativeIndex))
   }
+
+  override def getOperatorId(plan: QueryPlan[_]): Option[Int] = {
+    plan.getTagValue(QueryPlan.OP_ID_TAG)
+  }
+
+  override def setOperatorId(plan: QueryPlan[_], opId: Int): Unit = {
+    plan.setTagValue(QueryPlan.OP_ID_TAG, opId)
+  }
+
+  override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
+    plan.unsetTagValue(QueryPlan.OP_ID_TAG)
+  }
 }
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 4a6590161c..d130864a9f 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, KeyGroupedPartitioning, Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -524,4 +525,16 @@ class Spark35Shims extends SparkShims {
     val expr = arrayInsert.asInstanceOf[ArrayInsert]
     Seq(expr.srcArrayExpr, expr.posExpr, expr.itemExpr, 
Literal(expr.legacyNegativeIndex))
   }
+
+  override def getOperatorId(plan: QueryPlan[_]): Option[Int] = {
+    plan.getTagValue(QueryPlan.OP_ID_TAG)
+  }
+
+  override def setOperatorId(plan: QueryPlan[_], opId: Int): Unit = {
+    plan.setTagValue(QueryPlan.OP_ID_TAG, opId)
+  }
+
+  override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
+    plan.unsetTagValue(QueryPlan.OP_ID_TAG)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to