iemejia commented on code in PR #12390:
URL: https://github.com/apache/gluten/pull/12390#discussion_r3491500543


##########
backends-velox/src-delta33/test/scala/org/apache/spark/sql/execution/benchmark/DeltaPlanningBenchmark.scala:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.gluten.delta.DeltaDeletionVectorScanInfo
+import org.apache.gluten.extension.DeltaPostTransformRules
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.delta.DeltaLog
+
+import org.apache.hadoop.fs.Path
+
+/**
+ * Benchmark for Delta Lake planning-time operations in Gluten.
+ *
+ * Measures two hot paths that our performance optimizations target:
+ *
+ *   1. '''DV Materialization''' (`DeltaDeletionVectorScanInfo.normalize`): 
resolves table paths,
+ *      loads DV bitmaps from storage, and serializes them into split 
metadata. Our optimizations
+ *      (caching table path, Hadoop conf, DV store across files) target this 
path.
+ *   2. '''Post-transform rule application''' 
(`DeltaPostTransformRules.rules`): traverses the
+ *      physical plan to strip DV synthetic columns, push down 
input_file_name, and apply column
+ *      mapping. Our optimizations (early-exit guard, shallow child check, 
pre-computed names,
+ *      batched attribute mapping) target this path.
+ *
+ * To run:
+ * {{{
+ *   bin/spark-submit --class 
org.apache.spark.sql.execution.benchmark.DeltaPlanningBenchmark \
+ *     --jars <spark-core-test-jar> <gluten-backends-velox-jar>
+ * }}}

Review Comment:
   Fixed. Changed to comma-separated format: `--jars 
<spark-core-test-jar>,<gluten-backends-velox-jar>`.



##########
backends-velox/src-delta33/test/scala/org/apache/spark/sql/execution/benchmark/DeltaPlanningBenchmark.scala:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.gluten.delta.DeltaDeletionVectorScanInfo
+import org.apache.gluten.extension.DeltaPostTransformRules
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.delta.DeltaLog
+
+import org.apache.hadoop.fs.Path
+
+/**
+ * Benchmark for Delta Lake planning-time operations in Gluten.
+ *
+ * Measures two hot paths that our performance optimizations target:
+ *
+ *   1. '''DV Materialization''' (`DeltaDeletionVectorScanInfo.normalize`): 
resolves table paths,
+ *      loads DV bitmaps from storage, and serializes them into split 
metadata. Our optimizations
+ *      (caching table path, Hadoop conf, DV store across files) target this 
path.
+ *   2. '''Post-transform rule application''' 
(`DeltaPostTransformRules.rules`): traverses the
+ *      physical plan to strip DV synthetic columns, push down 
input_file_name, and apply column
+ *      mapping. Our optimizations (early-exit guard, shallow child check, 
pre-computed names,
+ *      batched attribute mapping) target this path.
+ *
+ * To run:
+ * {{{
+ *   bin/spark-submit --class 
org.apache.spark.sql.execution.benchmark.DeltaPlanningBenchmark \
+ *     --jars <spark-core-test-jar> <gluten-backends-velox-jar>
+ * }}}
+ *
+ * Or via Maven (from the backends-velox module):
+ * {{{
+ *   ./build/mvn test -pl backends-velox -Pspark-3.5 -Pbackends-velox -Pdelta 
-Pjava-17 \
+ *     -Dtest=none -DfailIfNoTests=false \
+ *     
-Dsuites="org.apache.spark.sql.execution.benchmark.DeltaPlanningBenchmark"
+ * }}}
+ */
+object DeltaPlanningBenchmark extends SqlBasedBenchmark {
+
+  override def getSparkSession: SparkSession = {
+    SparkSession
+      .builder()
+      .master("local[1]")
+      .appName("DeltaPlanningBenchmark")
+      .config("spark.sql.extensions", 
"io.delta.sql.DeltaSparkSessionExtension")
+      .config(
+        "spark.sql.catalog.spark_catalog",
+        "org.apache.spark.sql.delta.catalog.DeltaCatalog")
+      .config("spark.plugins", "org.apache.gluten.GlutenPlugin")
+      .config("spark.memory.offHeap.enabled", "true")
+      .config("spark.memory.offHeap.size", "1024MB")
+      .config("spark.ui.enabled", "false")
+      .config("spark.default.parallelism", "1")
+      .getOrCreate()
+  }
+
+  private val numFiles =
+    spark.sparkContext.conf.getInt("spark.gluten.benchmark.delta.numFiles", 
100)
+  private val rowsPerFile =
+    spark.sparkContext.conf.getInt("spark.gluten.benchmark.delta.rowsPerFile", 
10000)
+  private val benchmarkIters =
+    spark.sparkContext.conf.getInt("spark.gluten.benchmark.iterations", 5)
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runDvMaterializationBenchmark()
+    runPostTransformRulesBenchmark()
+    runNonDeltaRulesOverheadBenchmark()
+  }
+
+  /**
+   * Benchmarks DeltaDeletionVectorScanInfo.normalize() -- the critical path 
that loads DVs from
+   * storage on the driver. Measures how caching table path + DV store reduces 
overhead.
+   */
+  private def runDvMaterializationBenchmark(): Unit = {
+    val benchmark = new Benchmark(
+      s"DV Materialization (normalize) - $numFiles files",
+      numFiles.toLong,
+      minNumIters = benchmarkIters,
+      output = output)
+
+    withDeltaTableWithDVs(numFiles, rowsPerFile) {
+      (path, partitionedFiles) =>
+        benchmark.addCase(s"normalize() - $numFiles DV files", benchmarkIters) 
{
+          _ =>
+            DeltaDeletionVectorScanInfo.normalize(
+              partitionColumnCount = 0,
+              partitionFiles = partitionedFiles)
+        }
+
+        benchmark.run()
+    }
+  }
+
+  /**
+   * Benchmarks DeltaPostTransformRules application on a Delta plan with DV 
columns. Measures the
+   * combined cost of DV stripping, input_file pushdown, and column mapping.
+   */
+  private def runPostTransformRulesBenchmark(): Unit = {
+    val benchmark = new Benchmark(
+      "Post-transform rules (Delta plan)",
+      1L,
+      minNumIters = benchmarkIters,
+      output = output)
+
+    withDeltaTableWithDVs(numFiles = 10, rowsPerFile = 1000) {
+      (path, _) =>
+        val df = spark.read.format("delta").load(path)
+        // Force planning to get the executed plan with DeltaScanTransformer
+        val plan = df.queryExecution.executedPlan
+
+        benchmark.addCase("apply rules (Delta plan with DV)", benchmarkIters) {
+          _ =>
+            val result = DeltaPostTransformRules.rules.foldLeft(plan) {
+              (p, rule) => rule(p)
+            }
+            // Prevent dead code elimination
+            assert(result != null)
+        }
+
+        benchmark.run()
+    }
+  }
+
+  /**
+   * Benchmarks post-transform rules on a non-Delta plan to verify zero 
overhead from the early-exit
+   * guard. This is the "control" case showing that non-Delta queries don't 
pay for Delta rule
+   * traversals.
+   */
+  private def runNonDeltaRulesOverheadBenchmark(): Unit = {
+    val benchmark = new Benchmark(
+      "Post-transform rules (non-Delta plan)",
+      1L,
+      minNumIters = benchmarkIters,
+      output = output)
+
+    withTempPath {
+      p =>
+        // Create a parquet table (not Delta)
+        val path = p.getCanonicalPath
+        spark
+          .range(0, 100000, 1, numPartitions = 10)
+          .selectExpr("id", "id * 2 as value", "cast(id as string) as name")
+          .write
+          .parquet(path)
+
+        val df = spark.read.parquet(path)
+        val plan = df.queryExecution.executedPlan
+
+        benchmark.addCase("apply rules (non-Delta parquet plan)", 
benchmarkIters) {
+          _ =>
+            val result = DeltaPostTransformRules.rules.foldLeft(plan) {
+              (p, rule) => rule(p)
+            }
+            assert(result != null)
+        }
+
+        benchmark.run()
+    }
+  }
+
+  /**
+   * Benchmarks the delta40 `parseDescriptor` optimization: cached reflective 
method lookup vs
+   * uncached (resolving `getMethod` on every call). Simulates the pattern 
used in
+   * `DeltaDeletionVectorScanInfo` for Delta 4.0 API compatibility.
+   *
+   * The cached version resolves the Method object once (lazy val); the 
uncached version calls
+   * `getMethod` + `invoke` per descriptor, which is what the old code did 
per-file.
+   */
+  /**
+   * Creates a Delta table with deletion vectors and provides the partitioned 
files for direct DV
+   * materialization benchmarking.
+   */

Review Comment:
   Fixed. Removed the orphaned `parseDescriptor` benchmark Scaladoc block.



##########
gluten-delta/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala:
##########
@@ -154,24 +169,33 @@ object DeltaDeletionVectorScanInfo {
     }
   }
 
-  private def parseDescriptor(encodedDescriptor: String): 
DeletionVectorDescriptor = {
+  /** Cached reflective method for parsing DV descriptors (Delta 4.0 API 
compatibility). */
+  private lazy val descriptorParseMethod: java.lang.reflect.Method = {
     val methods = Seq("deserializeFromBase64", "fromJson")
     methods.iterator
-      .map {
+      .flatMap {
         methodName =>
-          Try {
-            val method = 
DeletionVectorDescriptor.getClass.getMethod(methodName, classOf[String])
-            method
-              .invoke(DeletionVectorDescriptor, encodedDescriptor)
-              .asInstanceOf[DeletionVectorDescriptor]
-          }.toOption
+          Try(DeletionVectorDescriptor.getClass.getMethod(methodName, 
classOf[String])).toOption
       }
-      .collectFirst { case Some(descriptor) => descriptor }
+      .nextOption()
       .getOrElse {
-        throw new IllegalArgumentException("Unable to parse Delta deletion 
vector descriptor")
+        throw new IllegalStateException(
+          "Unable to find DeletionVectorDescriptor parse method (tried: " +
+            methods.mkString(", ") + ")")
       }
   }
 
+  private def parseDescriptor(encodedDescriptor: String): 
DeletionVectorDescriptor = {
+    try {
+      descriptorParseMethod
+        .invoke(DeletionVectorDescriptor, encodedDescriptor)
+        .asInstanceOf[DeletionVectorDescriptor]

Review Comment:
   Fixed. Changed `descriptorParseMethod` (single cached method) to 
`descriptorParseMethods` (all available methods cached as a `Seq`). The 
`parseDescriptor` method now tries each cached method in order, preserving 
fallback semantics while still avoiding per-call `getMethod` lookups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to