Copilot commented on code in PR #12390:
URL: https://github.com/apache/gluten/pull/12390#discussion_r3490414238


##########
gluten-delta/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala:
##########
@@ -62,10 +64,20 @@ object DeltaDeletionVectorScanInfo {
    * Materializes per-file Delta DV read options for a split, alongside each 
file's metadata with
    * the DV bookkeeping keys stripped. Returns None when no file in the split 
carries a deletion
    * vector, so callers can keep the generic split representation.
+   *
+   * Performance: resolves the table path once (using the first file) and 
reuses a single Hadoop
+   * Configuration instance across all files in the partition to avoid 
redundant filesystem I/O and
+   * object allocation.
    */
   def normalize(partitionColumnCount: Int, partitionFiles: 
Seq[PartitionedFile])
       : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = {
-    val scanInfos = extractAll(activeSparkSession, partitionColumnCount, 
partitionFiles)
+    val spark = activeSparkSession
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val cachedTablePath = resolveTablePath(hadoopConf, partitionColumnCount, 
partitionFiles.head)

Review Comment:
   normalize() now calls partitionFiles.head unconditionally. If Spark ever 
produces an empty FilePartition (or a caller passes an empty Seq), this will 
throw NoSuchElementException. Returning None for empty input preserves the 
previous behavior and makes the method defensive.



##########
gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala:
##########
@@ -794,4 +796,67 @@ abstract class DeltaSuite extends 
WholeStageTransformerSuite {
       checkAnswer(df, Row(0, null) :: Row(101, Seq(Row("a", 1), null)) :: Nil)
     }
   }
+
+  test("post-transform rules are no-op on non-Delta plans") {
+    withTempPath {
+      p =>
+        val path = p.getCanonicalPath
+        spark.range(100).selectExpr("id", "id * 2 as 
value").write.parquet(path)
+        val df = spark.read.parquet(path)
+        val plan = df.queryExecution.executedPlan
+
+        // Rules should return the plan unchanged (early-exit guard)
+        val transformed = DeltaPostTransformRules.rules.foldLeft(plan) {
+          (p, rule) => rule(p)
+        }
+        // No DeltaScanTransformer in the plan, so rules should be identity
+        assert(
+          !transformed.exists(_.isInstanceOf[DeltaScanTransformer]),
+          "Non-Delta plan should not contain DeltaScanTransformer")
+        assert(
+          !plan.exists(_.isInstanceOf[DeltaScanTransformer]),
+          "Original plan should not contain DeltaScanTransformer")

Review Comment:
   This test claims post-transform rules are a no-op on non-Delta plans, but it 
applies the full DeltaPostTransformRules.rules sequence (which includes 
RemoveTransitions) and doesn't assert identity. If the intent is to validate 
the new early-exit guard, apply only the Delta-specific rule(s) (e.g., 
rules.tail) and assert the plan object is unchanged.



##########
gluten-delta/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala:
##########
@@ -154,24 +169,33 @@ object DeltaDeletionVectorScanInfo {
     }
   }
 
-  private def parseDescriptor(encodedDescriptor: String): 
DeletionVectorDescriptor = {
+  /** Cached reflective method for parsing DV descriptors (Delta 4.0 API 
compatibility). */
+  private lazy val descriptorParseMethod: java.lang.reflect.Method = {
     val methods = Seq("deserializeFromBase64", "fromJson")
     methods.iterator
-      .map {
+      .flatMap {
         methodName =>
-          Try {
-            val method = 
DeletionVectorDescriptor.getClass.getMethod(methodName, classOf[String])
-            method
-              .invoke(DeletionVectorDescriptor, encodedDescriptor)
-              .asInstanceOf[DeletionVectorDescriptor]
-          }.toOption
+          Try(DeletionVectorDescriptor.getClass.getMethod(methodName, 
classOf[String])).toOption
       }
-      .collectFirst { case Some(descriptor) => descriptor }
+      .nextOption()
       .getOrElse {
-        throw new IllegalArgumentException("Unable to parse Delta deletion 
vector descriptor")
+        throw new IllegalStateException(
+          "Unable to find DeletionVectorDescriptor parse method (tried: " +
+            methods.mkString(", ") + ")")
       }
   }
 
+  private def parseDescriptor(encodedDescriptor: String): 
DeletionVectorDescriptor = {
+    try {
+      descriptorParseMethod
+        .invoke(DeletionVectorDescriptor, encodedDescriptor)
+        .asInstanceOf[DeletionVectorDescriptor]

Review Comment:
   Delta 4.0 descriptor parsing: parseDescriptor always invokes the single 
cached descriptorParseMethod. If the first-found method exists but cannot parse 
a particular encodedDescriptor format, the code will now fail instead of 
falling back to the alternative method (as the previous per-call try-both logic 
did). Consider caching all available parse methods and trying them in order per 
call.



##########
backends-velox/src-delta33/test/scala/org/apache/spark/sql/execution/benchmark/DeltaPlanningBenchmark.scala:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.gluten.delta.DeltaDeletionVectorScanInfo
+import org.apache.gluten.extension.DeltaPostTransformRules
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.delta.DeltaLog
+
+import org.apache.hadoop.fs.Path
+
+/**
+ * Benchmark for Delta Lake planning-time operations in Gluten.
+ *
+ * Measures two hot paths that our performance optimizations target:
+ *
+ *   1. '''DV Materialization''' (`DeltaDeletionVectorScanInfo.normalize`): 
resolves table paths,
+ *      loads DV bitmaps from storage, and serializes them into split 
metadata. Our optimizations
+ *      (caching table path, Hadoop conf, DV store across files) target this 
path.
+ *   2. '''Post-transform rule application''' 
(`DeltaPostTransformRules.rules`): traverses the
+ *      physical plan to strip DV synthetic columns, push down 
input_file_name, and apply column
+ *      mapping. Our optimizations (early-exit guard, shallow child check, 
pre-computed names,
+ *      batched attribute mapping) target this path.
+ *
+ * To run:
+ * {{{
+ *   bin/spark-submit --class 
org.apache.spark.sql.execution.benchmark.DeltaPlanningBenchmark \
+ *     --jars <spark-core-test-jar> <gluten-backends-velox-jar>
+ * }}}

Review Comment:
   The spark-submit example uses `--jars <jar1> <jar2>`, but spark-submit 
expects a single comma-separated argument for --jars; otherwise the second jar 
is treated as an application argument. Update the doc snippet so it can be 
copy/pasted reliably.



##########
gluten-delta/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala:
##########
@@ -61,10 +63,23 @@ object DeltaDeletionVectorScanInfo {
    * Materializes per-file Delta DV read options for a split, alongside each 
file's metadata with
    * the DV bookkeeping keys stripped. Returns None when no file in the split 
carries a deletion
    * vector, so callers can keep the generic split representation.
+   *
+   * Performance: resolves the table path once (using the first file) and 
reuses a single Hadoop
+   * Configuration instance across all files in the partition to avoid 
redundant filesystem I/O and
+   * object allocation.
    */
   def normalize(partitionColumnCount: Int, partitionFiles: 
Seq[PartitionedFile])
       : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = {
-    val scanInfos = extractAll(activeSparkSession, partitionColumnCount, 
partitionFiles)
+    val spark = activeSparkSession
+    // Create a single Hadoop Configuration for the entire partition.
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    // Resolve table path once using the first file -- all files in a Delta 
table share the same
+    // root, so this avoids N-1 redundant filesystem existence checks.
+    val cachedTablePath = resolveTablePath(hadoopConf, partitionColumnCount, 
partitionFiles.head)

Review Comment:
   normalize() now calls partitionFiles.head unconditionally. If Spark ever 
produces an empty FilePartition (or a caller passes an empty Seq), this will 
throw NoSuchElementException. Returning None for empty input preserves the 
previous behavior and makes the method defensive.



##########
gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala:
##########
@@ -794,4 +796,67 @@ abstract class DeltaSuite extends 
WholeStageTransformerSuite {
       checkAnswer(df, Row(0, null) :: Row(101, Seq(Row("a", 1), null)) :: Nil)
     }
   }
+
+  test("post-transform rules are no-op on non-Delta plans") {
+    withTempPath {
+      p =>
+        val path = p.getCanonicalPath
+        spark.range(100).selectExpr("id", "id * 2 as 
value").write.parquet(path)
+        val df = spark.read.parquet(path)
+        val plan = df.queryExecution.executedPlan
+
+        // Rules should return the plan unchanged (early-exit guard)
+        val transformed = DeltaPostTransformRules.rules.foldLeft(plan) {
+          (p, rule) => rule(p)
+        }
+        // No DeltaScanTransformer in the plan, so rules should be identity
+        assert(
+          !transformed.exists(_.isInstanceOf[DeltaScanTransformer]),
+          "Non-Delta plan should not contain DeltaScanTransformer")
+        assert(
+          !plan.exists(_.isInstanceOf[DeltaScanTransformer]),
+          "Original plan should not contain DeltaScanTransformer")
+    }
+  }
+
+  test("post-transform rules produce DeltaScanTransformer for Delta tables") {

Review Comment:
   This test doesn't exercise post-transform rule application; it only checks 
that the executed plan already contains DeltaScanTransformer. Either rename the 
test to reflect what it's actually verifying, or apply 
DeltaPostTransformRules.rules and assert on the transformed plan.



##########
gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala:
##########
@@ -794,4 +796,67 @@ abstract class DeltaSuite extends 
WholeStageTransformerSuite {
       checkAnswer(df, Row(0, null) :: Row(101, Seq(Row("a", 1), null)) :: Nil)
     }
   }
+
+  test("post-transform rules are no-op on non-Delta plans") {
+    withTempPath {
+      p =>
+        val path = p.getCanonicalPath
+        spark.range(100).selectExpr("id", "id * 2 as 
value").write.parquet(path)
+        val df = spark.read.parquet(path)
+        val plan = df.queryExecution.executedPlan
+
+        // Rules should return the plan unchanged (early-exit guard)
+        val transformed = DeltaPostTransformRules.rules.foldLeft(plan) {
+          (p, rule) => rule(p)
+        }
+        // No DeltaScanTransformer in the plan, so rules should be identity
+        assert(
+          !transformed.exists(_.isInstanceOf[DeltaScanTransformer]),
+          "Non-Delta plan should not contain DeltaScanTransformer")
+        assert(
+          !plan.exists(_.isInstanceOf[DeltaScanTransformer]),
+          "Original plan should not contain DeltaScanTransformer")
+    }
+  }
+
+  test("post-transform rules produce DeltaScanTransformer for Delta tables") {
+    withTempPath {
+      p =>
+        import testImplicits._
+        val path = p.getCanonicalPath
+        Seq(1, 2, 3, 4, 
5).toDF("id").coalesce(1).write.format("delta").save(path)
+        val df = spark.read.format("delta").load(path)
+        val plan = df.queryExecution.executedPlan
+
+        // Delta scan should be offloaded to DeltaScanTransformer
+        val deltaScans = plan.collect { case s: DeltaScanTransformer => s }
+        assert(deltaScans.nonEmpty, "Delta plan should contain 
DeltaScanTransformer")
+    }
+  }
+
+  test("scanFilters returns consistent results on repeated access") {
+    withTempPath {
+      p =>
+        import testImplicits._
+        val path = p.getCanonicalPath
+        Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "value")
+          .coalesce(1)
+          .write
+          .format("delta")
+          .save(path)
+        val df = spark.read.format("delta").load(path).where("id > 1")
+        val plan = df.queryExecution.executedPlan
+        val scans = plan.collect { case s: DeltaScanTransformer => s }
+
+        if (scans.nonEmpty) {
+          val scan = scans.head
+          // scanFilters is now a lazy val; repeated calls should return the 
same instance
+          val first = scan.scanFilters
+          val second = scan.scanFilters
+          val third = scan.scanFilters
+          assert(first eq second, "scanFilters should return the same cached 
instance")
+          assert(second eq third, "scanFilters should return the same cached 
instance")
+        }

Review Comment:
   This test is guarded by `if (scans.nonEmpty)`, so it will silently pass if 
DeltaScanTransformer offloading stops happening (exactly the kind of regression 
the test is meant to catch). It should assert scans.nonEmpty and then perform 
the lazy-val caching checks.



##########
backends-velox/src-delta33/test/scala/org/apache/spark/sql/execution/benchmark/DeltaPlanningBenchmark.scala:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.gluten.delta.DeltaDeletionVectorScanInfo
+import org.apache.gluten.extension.DeltaPostTransformRules
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.delta.DeltaLog
+
+import org.apache.hadoop.fs.Path
+
+/**
+ * Benchmark for Delta Lake planning-time operations in Gluten.
+ *
+ * Measures two hot paths that our performance optimizations target:
+ *
+ *   1. '''DV Materialization''' (`DeltaDeletionVectorScanInfo.normalize`): 
resolves table paths,
+ *      loads DV bitmaps from storage, and serializes them into split 
metadata. Our optimizations
+ *      (caching table path, Hadoop conf, DV store across files) target this 
path.
+ *   2. '''Post-transform rule application''' 
(`DeltaPostTransformRules.rules`): traverses the
+ *      physical plan to strip DV synthetic columns, push down 
input_file_name, and apply column
+ *      mapping. Our optimizations (early-exit guard, shallow child check, 
pre-computed names,
+ *      batched attribute mapping) target this path.
+ *
+ * To run:
+ * {{{
+ *   bin/spark-submit --class 
org.apache.spark.sql.execution.benchmark.DeltaPlanningBenchmark \
+ *     --jars <spark-core-test-jar> <gluten-backends-velox-jar>
+ * }}}
+ *
+ * Or via Maven (from the backends-velox module):
+ * {{{
+ *   ./build/mvn test -pl backends-velox -Pspark-3.5 -Pbackends-velox -Pdelta 
-Pjava-17 \
+ *     -Dtest=none -DfailIfNoTests=false \
+ *     
-Dsuites="org.apache.spark.sql.execution.benchmark.DeltaPlanningBenchmark"
+ * }}}
+ */
+object DeltaPlanningBenchmark extends SqlBasedBenchmark {
+
+  override def getSparkSession: SparkSession = {
+    SparkSession
+      .builder()
+      .master("local[1]")
+      .appName("DeltaPlanningBenchmark")
+      .config("spark.sql.extensions", 
"io.delta.sql.DeltaSparkSessionExtension")
+      .config(
+        "spark.sql.catalog.spark_catalog",
+        "org.apache.spark.sql.delta.catalog.DeltaCatalog")
+      .config("spark.plugins", "org.apache.gluten.GlutenPlugin")
+      .config("spark.memory.offHeap.enabled", "true")
+      .config("spark.memory.offHeap.size", "1024MB")
+      .config("spark.ui.enabled", "false")
+      .config("spark.default.parallelism", "1")
+      .getOrCreate()
+  }
+
+  private val numFiles =
+    spark.sparkContext.conf.getInt("spark.gluten.benchmark.delta.numFiles", 
100)
+  private val rowsPerFile =
+    spark.sparkContext.conf.getInt("spark.gluten.benchmark.delta.rowsPerFile", 
10000)
+  private val benchmarkIters =
+    spark.sparkContext.conf.getInt("spark.gluten.benchmark.iterations", 5)
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runDvMaterializationBenchmark()
+    runPostTransformRulesBenchmark()
+    runNonDeltaRulesOverheadBenchmark()
+  }
+
+  /**
+   * Benchmarks DeltaDeletionVectorScanInfo.normalize() -- the critical path 
that loads DVs from
+   * storage on the driver. Measures how caching table path + DV store reduces 
overhead.
+   */
+  private def runDvMaterializationBenchmark(): Unit = {
+    val benchmark = new Benchmark(
+      s"DV Materialization (normalize) - $numFiles files",
+      numFiles.toLong,
+      minNumIters = benchmarkIters,
+      output = output)
+
+    withDeltaTableWithDVs(numFiles, rowsPerFile) {
+      (path, partitionedFiles) =>
+        benchmark.addCase(s"normalize() - $numFiles DV files", benchmarkIters) 
{
+          _ =>
+            DeltaDeletionVectorScanInfo.normalize(
+              partitionColumnCount = 0,
+              partitionFiles = partitionedFiles)
+        }
+
+        benchmark.run()
+    }
+  }
+
+  /**
+   * Benchmarks DeltaPostTransformRules application on a Delta plan with DV 
columns. Measures the
+   * combined cost of DV stripping, input_file pushdown, and column mapping.
+   */
+  private def runPostTransformRulesBenchmark(): Unit = {
+    val benchmark = new Benchmark(
+      "Post-transform rules (Delta plan)",
+      1L,
+      minNumIters = benchmarkIters,
+      output = output)
+
+    withDeltaTableWithDVs(numFiles = 10, rowsPerFile = 1000) {
+      (path, _) =>
+        val df = spark.read.format("delta").load(path)
+        // Force planning to get the executed plan with DeltaScanTransformer
+        val plan = df.queryExecution.executedPlan
+
+        benchmark.addCase("apply rules (Delta plan with DV)", benchmarkIters) {
+          _ =>
+            val result = DeltaPostTransformRules.rules.foldLeft(plan) {
+              (p, rule) => rule(p)
+            }
+            // Prevent dead code elimination
+            assert(result != null)
+        }
+
+        benchmark.run()
+    }
+  }
+
+  /**
+   * Benchmarks post-transform rules on a non-Delta plan to verify zero 
overhead from the early-exit
+   * guard. This is the "control" case showing that non-Delta queries don't 
pay for Delta rule
+   * traversals.
+   */
+  private def runNonDeltaRulesOverheadBenchmark(): Unit = {
+    val benchmark = new Benchmark(
+      "Post-transform rules (non-Delta plan)",
+      1L,
+      minNumIters = benchmarkIters,
+      output = output)
+
+    withTempPath {
+      p =>
+        // Create a parquet table (not Delta)
+        val path = p.getCanonicalPath
+        spark
+          .range(0, 100000, 1, numPartitions = 10)
+          .selectExpr("id", "id * 2 as value", "cast(id as string) as name")
+          .write
+          .parquet(path)
+
+        val df = spark.read.parquet(path)
+        val plan = df.queryExecution.executedPlan
+
+        benchmark.addCase("apply rules (non-Delta parquet plan)", 
benchmarkIters) {
+          _ =>
+            val result = DeltaPostTransformRules.rules.foldLeft(plan) {
+              (p, rule) => rule(p)
+            }
+            assert(result != null)
+        }
+
+        benchmark.run()
+    }
+  }
+
+  /**
+   * Benchmarks the delta40 `parseDescriptor` optimization: cached reflective 
method lookup vs
+   * uncached (resolving `getMethod` on every call). Simulates the pattern 
used in
+   * `DeltaDeletionVectorScanInfo` for Delta 4.0 API compatibility.
+   *
+   * The cached version resolves the Method object once (lazy val); the 
uncached version calls
+   * `getMethod` + `invoke` per descriptor, which is what the old code did 
per-file.
+   */
+  /**
+   * Creates a Delta table with deletion vectors and provides the partitioned 
files for direct DV
+   * materialization benchmarking.
+   */

Review Comment:
   There is a Scaladoc block describing a `parseDescriptor` benchmark, but no 
such benchmark is implemented (it's immediately followed by the 
withDeltaTableWithDVs doc). This looks like a leftover and can confuse readers; 
either implement that benchmark or remove the orphaned Scaladoc.



##########
gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java:
##########
@@ -96,6 +96,9 @@ public enum ReadFileFormat {
   /**
    * Copies an existing node, replacing its per-file extra metadata. Lets 
data-lake subclasses
    * decorate a generically built node without re-deriving the file listing.
+   *
+   * <p>Note: uses direct list reference transfer (not deep copy) for 
efficiency, since the original
+   * node is typically discarded immediately after this constructor returns.

Review Comment:
   The constructor comment says it "uses direct list reference transfer", but 
the implementation calls addAll() into this.otherMetadataColumns, which copies 
the list (shallow) rather than transferring the list reference. This is 
misleading for readers trying to reason about aliasing and mutability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to