Copilot commented on code in PR #12236:
URL: https://github.com/apache/gluten/pull/12236#discussion_r3357400023


##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.types.{DataType, StructType}
+
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
+
+import java.nio.charset.StandardCharsets
+
+/**
+ * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch 
hot path. Best-effort
+ * Caffeine LRU; eviction recomputes via the same pure codec, so misses are 
indistinguishable from
+ * the no-cache baseline. Thread-safety via Caffeine `get(key, 
mappingFunction)`.
+ */

Review Comment:
   The scaladoc says this is an "LRU" cache, but 
`Caffeine.newBuilder.maximumSize(...)` uses Caffeine's size-bounded eviction 
policy (W-TinyLFU), not strict LRU. This wording could mislead readers about 
eviction behavior; consider describing it as a size-bounded cache instead.



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.types.{DataType, StructType}
+
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
+
+import java.nio.charset.StandardCharsets
+
+/**
+ * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch 
hot path. Best-effort
+ * Caffeine LRU; eviction recomputes via the same pure codec, so misses are 
indistinguishable from
+ * the no-cache baseline. Thread-safety via Caffeine `get(key, 
mappingFunction)`.
+ */
+final private[execution] class SchemaJsonInternCache {
+  import SchemaJsonInternCache._
+
+  private val encodeCache: Cache[StructType, Array[Byte]] =
+    Caffeine.newBuilder.maximumSize(CAP).build[StructType, Array[Byte]]()
+
+  private val decodeCache: Cache[String, StructType] =
+    Caffeine.newBuilder.maximumSize(CAP).build[String, StructType]()
+
+  /** Returns the canonical UTF-8 JSON byte form of `schema`. */
+  def encodeBytes(schema: StructType): Array[Byte] =
+    encodeCache.get(schema, k => k.json.getBytes(StandardCharsets.UTF_8))

Review Comment:
   `encodeBytes` returns the cached backing `Array[Byte]`. Because arrays are 
mutable, any accidental in-place modification by a caller would corrupt the 
cache and subsequent serializations. At minimum, document that the returned 
array must be treated as immutable (or return a defensive copy if you need 
stronger safety).



##########
backends-velox/src/test/scala/org/apache/spark/sql/execution/SchemaJsonInternCacheSuite.scala:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.util.Random
+
+/**
+ * Invariants for [[SchemaJsonInternCache]]: (1) determinism -- equal inputs 
yield byte-identical /
+ * canonical-instance outputs; (2) capacity -- LRU cap = 256, eviction never 
corrupts later results;
+ * (3) concurrency -- contended get-or-compute yields correct results without 
exception.
+ */
+class SchemaJsonInternCacheSuite extends SparkFunSuite {
+
+  private def schemaOfWidth(n: Int): StructType =
+    StructType((0 until n).map(i => StructField(s"c$i", LongType, nullable = 
true)))
+
+  // === Invariant 1: determinism ===
+
+  test("encode is deterministic: same StructType => byte-identical output") {
+    val intern = new SchemaJsonInternCache
+    val s = schemaOfWidth(10)
+    val a = intern.encodeBytes(s)
+    val b = intern.encodeBytes(s)
+    assert(a.sameElements(b), "encodeBytes must be deterministic for equal 
inputs")
+    // intern is a memoizer, not a transformer
+    val raw = s.json.getBytes(StandardCharsets.UTF_8)
+    assert(a.sameElements(raw), "encodeBytes(s) must equal 
s.json.getBytes(UTF_8)")
+  }
+
+  test("decode is deterministic: same bytes => structurally-equal StructType") 
{
+    val intern = new SchemaJsonInternCache
+    val s = StructType(Seq(
+      StructField("a", IntegerType),
+      StructField("b", StringType),
+      StructField("c", LongType, nullable = false)))
+    val bytes = s.json.getBytes(StandardCharsets.UTF_8)
+    val d1 = intern.decodeStructType(bytes)
+    val d2 = intern.decodeStructType(bytes)
+    assert(d1 == s)
+    assert(d2 == s)
+    // canonical-instance contract: equal bytes => same instance (saves 
repeated parse cost)
+    assert(d1.eq(d2), "decodeStructType must return the same canonical 
instance for equal bytes")
+  }
+
+  test("encode canonicality: same StructType returns the same byte array 
instance") {
+    val intern = new SchemaJsonInternCache
+    val s = schemaOfWidth(5)
+    val a = intern.encodeBytes(s)
+    val b = intern.encodeBytes(s)
+    assert(a.eq(b), "encodeBytes must return the same canonical byte array for 
equal inputs")
+  }
+
+  // === Invariant 2: capacity ===
+
+  test("cap = 256 entries: eviction past cap does not corrupt later results") {
+    val intern = new SchemaJsonInternCache
+    val cap = 256
+    val total = cap * 4 // 1024 distinct schemas, forces ~75% miss rate
+    val schemas = (0 until total).map(i => schemaOfWidth(8 + (i % 16)))
+    schemas.foreach(intern.encodeBytes)
+    schemas.zipWithIndex.foreach {
+      case (s, i) =>
+        val cached = intern.encodeBytes(s)
+        val raw = s.json.getBytes(StandardCharsets.UTF_8)
+        assert(
+          cached.sameElements(raw),
+          s"entry $i (width=${s.length}) was corrupted across eviction cycles")
+    }
+  }
+
+  test("decode under cap pressure: >= cap distinct bytes still all decode 
correctly") {
+    val intern = new SchemaJsonInternCache
+    val cap = 256
+    val distinct = cap * 4
+    val pairs = (0 until distinct).map {
+      i =>
+        val s = schemaOfWidth(8 + (i % 16))
+        (s, s.json.getBytes(StandardCharsets.UTF_8))
+    }
+    // walk twice -- second walk hits a mix of evicted and live entries
+    pairs.foreach { case (_, bytes) => intern.decodeStructType(bytes) }
+    pairs.foreach {
+      case (s, bytes) =>
+        val decoded = intern.decodeStructType(bytes)
+        assert(decoded == s, s"decoded != expected for width=${s.length}")
+    }
+  }
+
+  // === Invariant 3: concurrency ===
+
+  test("concurrent get-or-compute: N threads on overlapping keys yields 
correct results") {
+    val intern = new SchemaJsonInternCache
+    val threads = 8
+    val keysPerThread = 200
+    val sharedKeySpace = 64 // overlap forces contention on same cache slots
+    val schemas = (0 until sharedKeySpace).map(i => schemaOfWidth(8 + (i % 
12)))
+
+    val pool = Executors.newFixedThreadPool(threads)
+    val start = new CountDownLatch(1)
+    val errors = new AtomicInteger(0)
+    val random = new Random(42)
+
+    val futures = (0 until threads).map {
+      tid =>
+        val rnd = new Random(random.nextLong())
+        pool.submit(new Runnable {
+          override def run(): Unit = {
+            start.await()
+            var i = 0
+            while (i < keysPerThread) {
+              val s = schemas(rnd.nextInt(sharedKeySpace))
+              try {
+                val enc = intern.encodeBytes(s)
+                val raw = s.json.getBytes(StandardCharsets.UTF_8)
+                if (!enc.sameElements(raw)) errors.incrementAndGet()
+
+                val dec = intern.decodeStructType(raw)
+                if (dec != s) errors.incrementAndGet()
+              } catch {
+                case _: Throwable => errors.incrementAndGet()
+              }
+              i += 1
+            }
+          }
+        })
+    }
+    start.countDown()
+    futures.foreach(_.get(60, TimeUnit.SECONDS))
+    pool.shutdown()
+    assert(pool.awaitTermination(10, TimeUnit.SECONDS), "thread pool did not 
terminate")
+    assert(
+      errors.get() == 0,
+      s"${errors.get()} concurrent get-or-compute errors out of ${threads * 
keysPerThread} ops")
+  }

Review Comment:
   If `futures.foreach(_.get(...))` throws (timeout / execution error), the 
thread pool won't be shut down, which can leak non-daemon threads and hang the 
rest of the test suite/CI JVM. Use a try/finally to ensure `shutdown()` / 
`awaitTermination()` always run.



##########
backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCachePartitionStatsBenchmark.scala:
##########
@@ -59,6 +64,162 @@ object ColumnarTableCachePartitionStatsBenchmark extends 
SqlBasedBenchmark {
     }
   }
 
+  // 
============================================================================
+  // Schema-codec intern microbench (SchemaJsonInternCache).
+  //
+  // ColumnarCachedBatchSerializer hot paths call StructType.json on every 
batch
+  // write and DataType.fromJson on every batch read. The intern cache memoizes
+  // the round-trip without changing the wire format. Sections below compare 
two
+  // distinct method calls in the same JVM as cache off (raw codec) vs cache on
+  // (intern memoized round-trip), with no toggle on the cache class itself.
+  // 
============================================================================
+
+  private val INTERN_CAP = 256

Review Comment:
   `INTERN_CAP` duplicates the cache capacity constant. If 
`SchemaJsonInternCache`'s CAP ever changes, this benchmark will silently go out 
of sync. Consider referencing `SchemaJsonInternCache.CAP` (after making it 
`private[execution]`) instead of hard-coding 256 here.



##########
benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt:
##########
@@ -1,28 +1,162 @@
-OpenJDK 64-Bit Server VM 17.0.18+8-Ubuntu-124.04.1 on Linux 
6.6.87.2-microsoft-standard-WSL2
+OpenJDK 64-Bit Server VM 17.0.18+8 on Linux 6.6.87.2-microsoft-standard-WSL2
 AMD EPYC 7763 64-Core Processor
 table cache build:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-partitionStats off                               126425         138565       
10546          0.8        1264.3       1.0X
-partitionStats on                                131431         137094        
7581          0.8        1314.3       1.0X
+partitionStats off                               127107         127856         
668          0.8        1271.1       1.0X
+partitionStats on                                134398         146067       
10193          0.7        1344.0       0.9X

Review Comment:
   This benchmark-results file now has leading line numbers and `|` separators 
on every line. If the intent is to check in raw Spark `Benchmark` output for 
easy diffing, these prefixes make comparisons noisier and suggest the content 
may have been passed through a formatter like `nl`. Consider 
regenerating/committing the plain output format if available.



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.types.{DataType, StructType}
+
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
+
+import java.nio.charset.StandardCharsets
+
+/**
+ * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch 
hot path. Best-effort
+ * Caffeine LRU; eviction recomputes via the same pure codec, so misses are 
indistinguishable from
+ * the no-cache baseline. Thread-safety via Caffeine `get(key, 
mappingFunction)`.
+ */
+final private[execution] class SchemaJsonInternCache {
+  import SchemaJsonInternCache._
+
+  private val encodeCache: Cache[StructType, Array[Byte]] =
+    Caffeine.newBuilder.maximumSize(CAP).build[StructType, Array[Byte]]()
+
+  private val decodeCache: Cache[String, StructType] =
+    Caffeine.newBuilder.maximumSize(CAP).build[String, StructType]()
+
+  /** Returns the canonical UTF-8 JSON byte form of `schema`. */
+  def encodeBytes(schema: StructType): Array[Byte] =
+    encodeCache.get(schema, k => k.json.getBytes(StandardCharsets.UTF_8))
+
+  /** Returns the canonical [[StructType]] parsed from `bytes` (UTF-8 JSON). */
+  def decodeStructType(bytes: Array[Byte]): StructType = {
+    val key = new String(bytes, StandardCharsets.UTF_8)
+    decodeCache.get(key, k => DataType.fromJson(k).asInstanceOf[StructType])
+  }
+}
+
+private[execution] object SchemaJsonInternCache {
+  // 256 entries: <= ~8.5 MB retained even at 1000-field schemas (~33 KB JSON 
each). Verified by
+  // Section C working-set sweep of the FU-D7 bench harness; revisit if C1/C2 
gates fail.
+  private val CAP = 256L

Review Comment:
   The CAP comment references an undefined "FU-D7" harness and the memory 
estimate appears to only account for one side of the cache. Since both encode 
(bytes) and decode (String key + StructType) caches retain data, the bound is 
higher than ~8.5 MB in the worst case. Also, hard-coding `256` across 
tests/benchmarks is easier to keep consistent if `CAP` is package-visible.



##########
backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCachePartitionStatsBenchmark.scala:
##########
@@ -59,6 +64,162 @@ object ColumnarTableCachePartitionStatsBenchmark extends 
SqlBasedBenchmark {
     }
   }
 
+  // 
============================================================================
+  // Schema-codec intern microbench (SchemaJsonInternCache).
+  //
+  // ColumnarCachedBatchSerializer hot paths call StructType.json on every 
batch
+  // write and DataType.fromJson on every batch read. The intern cache memoizes
+  // the round-trip without changing the wire format. Sections below compare 
two
+  // distinct method calls in the same JVM as cache off (raw codec) vs cache on
+  // (intern memoized round-trip), with no toggle on the cache class itself.
+  // 
============================================================================
+
+  private val INTERN_CAP = 256
+
+  private def schemaFixture(numCols: Int, nameLen: Int): StructType = {
+    val name = "c" + ("x" * math.max(0, nameLen - 1))
+    StructType(
+      (0 until numCols).map(i => StructField(s"$name$i", LongType, nullable = 
true)))
+  }
+
+  // TPC-DS store_sales-derived 23-col mixed-type fixture; realistic name 
shape.
+  private def realisticSchema: StructType = StructType(
+    Seq(
+      StructField("ss_sold_date_sk", IntegerType),
+      StructField("ss_sold_time_sk", IntegerType),
+      StructField("ss_item_sk", IntegerType),
+      StructField("ss_customer_sk", IntegerType),
+      StructField("ss_cdemo_sk", IntegerType),
+      StructField("ss_hdemo_sk", IntegerType),
+      StructField("ss_addr_sk", IntegerType),
+      StructField("ss_store_sk", IntegerType),
+      StructField("ss_promo_sk", IntegerType),
+      StructField("ss_ticket_number", LongType),
+      StructField("ss_quantity", IntegerType),
+      StructField("ss_wholesale_cost", DecimalType(7, 2)),
+      StructField("ss_list_price", DecimalType(7, 2)),
+      StructField("ss_sales_price", DecimalType(7, 2)),
+      StructField("ss_ext_discount_amt", DecimalType(7, 2)),
+      StructField("ss_ext_sales_price", DecimalType(7, 2)),
+      StructField("ss_ext_wholesale_cost", DecimalType(7, 2)),
+      StructField("ss_ext_list_price", DecimalType(7, 2)),
+      StructField("ss_ext_tax", DecimalType(7, 2)),
+      StructField("ss_coupon_amt", DecimalType(7, 2)),
+      StructField("ss_net_paid", DecimalType(7, 2)),
+      StructField("ss_net_paid_inc_tax", DecimalType(7, 2)),
+      StructField("ss_net_profit", DecimalType(7, 2))
+    ))
+
+  private val internSchemas: Seq[(String, StructType)] =
+    (for {
+      width <- Seq(10, 100, 1000)
+      nameLen <- Seq(1, 32)
+    } yield (s"w=$width n=$nameLen", schemaFixture(width, nameLen))) :+
+      ("tpcds-store_sales-23col" -> realisticSchema)
+
+  private def runInternEncode(label: String, schema: StructType): Unit = {
+    val N = 1L * 1000 * 1000
+    val intern = new SchemaJsonInternCache
+    val bench = new Benchmark(label, N, output = output)
+    bench.addCase("off (raw schema.json.getBytes per call)", 5) {
+      _ =>
+        var i = 0L
+        var checksum = 0L
+        while (i < N) {
+          val bytes = schema.json.getBytes(StandardCharsets.UTF_8)
+          checksum ^= bytes.length.toLong
+          i += 1
+        }
+        assert(checksum != Long.MinValue, s"checksum=$checksum")
+    }
+    bench.addCase("on  (intern.encodeBytes: cached canonical bytes)", 5) {
+      _ =>
+        var i = 0L
+        var checksum = 0L
+        while (i < N) {
+          val bytes = intern.encodeBytes(schema)
+          checksum ^= bytes.length.toLong
+          i += 1
+        }
+        assert(checksum != Long.MinValue, s"checksum=$checksum")
+    }
+    bench.run()
+  }
+
+  private def runInternDecode(label: String, schema: StructType): Unit = {
+    val N = 1L * 100 * 1000
+    val intern = new SchemaJsonInternCache
+    val jsonBytes = schema.json.getBytes(StandardCharsets.UTF_8)
+    val bench = new Benchmark(label, N, output = output)
+    bench.addCase("off (raw DataType.fromJson per call)", 5) {
+      _ =>
+        var i = 0L
+        var checksum = 0L
+        while (i < N) {
+          val s = DataType
+            .fromJson(new String(jsonBytes, StandardCharsets.UTF_8))
+            .asInstanceOf[StructType]
+          checksum ^= s.length.toLong
+          i += 1
+        }
+        assert(checksum != Long.MinValue, s"checksum=$checksum")
+    }
+    bench.addCase("on  (intern.decodeStructType: cached canonical 
StructType)", 5) {
+      _ =>
+        var i = 0L
+        var checksum = 0L
+        while (i < N) {
+          val s = intern.decodeStructType(jsonBytes)
+          checksum ^= s.length.toLong
+          i += 1
+        }
+        assert(checksum != Long.MinValue, s"checksum=$checksum")
+    }
+    bench.run()
+  }
+
+  // Working-set sweep across three regimes around cap = 256:
+  //   C1 == cap     -> 100% hit steady state
+  //   C2 == 2 x cap -> eviction pressure, partial hit
+  //   C3 == 4 x cap -> worst-case round-robin, ~all miss
+  // Gates (read at results-read time):
+  //   C1 on must be >= off; C2 on within 1.5x of off; C3 documented as known 
regression.

Review Comment:
   These lines describe performance "gates", but the benchmark doesn't actually 
enforce any thresholds—it's just emitting numbers. Consider rewording to make 
it clear this is a manual interpretation guideline (or remove the gating 
language).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to