yaooqinn commented on code in PR #12236:
URL: https://github.com/apache/gluten/pull/12236#discussion_r3366449098


##########
backends-velox/src/test/scala/org/apache/spark/sql/execution/SchemaJsonInternCacheSuite.scala:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.util.Random
+
+/**
+ * Invariants for [[SchemaJsonInternCache]]: (1) determinism -- equal inputs 
yield byte-identical /
+ * canonical-instance outputs; (2) capacity -- LRU cap = 256, eviction never 
corrupts later results;
+ * (3) concurrency -- contended get-or-compute yields correct results without 
exception.

Review Comment:
   Adopted in 0f2e4d9054 — suite scaladoc updated to "size-bounded cap = 256". 
(The production class scaladoc was already fixed in 8470627513; this was a 
missed second occurrence in the test file.)



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.types.{DataType, StructType}
+
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
+
+import java.nio.charset.StandardCharsets
+
+/**
+ * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch 
hot path. Best-effort
+ * Caffeine LRU; eviction recomputes via the same pure codec, so misses are 
indistinguishable from
+ * the no-cache baseline. Thread-safety via Caffeine `get(key, 
mappingFunction)`.
+ */

Review Comment:
   Already addressed in 8470627513 — `SchemaJsonInternCache.scala` L26-29 now 
reads "size-bounded Caffeine cache (W-TinyLFU); eviction recomputes via the 
same pure codec, so misses are indistinguishable from the no-cache baseline." 
Looks like this re-review was generated against a stale snapshot.



##########
benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt:
##########
@@ -1,28 +1,162 @@
-OpenJDK 64-Bit Server VM 17.0.18+8-Ubuntu-124.04.1 on Linux 
6.6.87.2-microsoft-standard-WSL2
+OpenJDK 64-Bit Server VM 17.0.18+8 on Linux 6.6.87.2-microsoft-standard-WSL2
 AMD EPYC 7763 64-Core Processor
 table cache build:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-partitionStats off                               126425         138565       
10546          0.8        1264.3       1.0X
-partitionStats on                                131431         137094        
7581          0.8        1314.3       1.0X
+partitionStats off                               127107         127856         
668          0.8        1271.1       1.0X
+partitionStats on                                134398         146067       
10193          0.7        1344.0       0.9X

Review Comment:
   This is still a false positive — the file does not have any `<N>|` prefix. 
`xxd benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt | head 
-1` confirms byte 0 = 'O' (start of "OpenJDK 64-Bit Server VM"). The leading 
numbers visible in the GitHub diff viewer are the diff line-number gutter, not 
file content.



##########
backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCachePartitionStatsBenchmark.scala:
##########
@@ -59,6 +64,163 @@ object ColumnarTableCachePartitionStatsBenchmark extends 
SqlBasedBenchmark {
     }
   }
 
+  // 
============================================================================
+  // Schema-codec intern microbench (SchemaJsonInternCache).
+  //
+  // ColumnarCachedBatchSerializer hot paths call StructType.json on every 
batch
+  // write and DataType.fromJson on every batch read. The intern cache memoizes
+  // the round-trip without changing the wire format. Sections below compare 
two
+  // distinct method calls in the same JVM as cache off (raw codec) vs cache on
+  // (intern memoized round-trip), with no toggle on the cache class itself.
+  // 
============================================================================
+
+  private val INTERN_CAP = SchemaJsonInternCache.CAP.toInt
+
+  private def schemaFixture(numCols: Int, nameLen: Int): StructType = {
+    val name = "c" + ("x" * math.max(0, nameLen - 1))
+    StructType(
+      (0 until numCols).map(i => StructField(s"$name$i", LongType, nullable = 
true)))
+  }
+
+  // TPC-DS store_sales-derived 23-col mixed-type fixture; realistic name 
shape.
+  private def realisticSchema: StructType = StructType(
+    Seq(
+      StructField("ss_sold_date_sk", IntegerType),
+      StructField("ss_sold_time_sk", IntegerType),
+      StructField("ss_item_sk", IntegerType),
+      StructField("ss_customer_sk", IntegerType),
+      StructField("ss_cdemo_sk", IntegerType),
+      StructField("ss_hdemo_sk", IntegerType),
+      StructField("ss_addr_sk", IntegerType),
+      StructField("ss_store_sk", IntegerType),
+      StructField("ss_promo_sk", IntegerType),
+      StructField("ss_ticket_number", LongType),
+      StructField("ss_quantity", IntegerType),
+      StructField("ss_wholesale_cost", DecimalType(7, 2)),
+      StructField("ss_list_price", DecimalType(7, 2)),
+      StructField("ss_sales_price", DecimalType(7, 2)),
+      StructField("ss_ext_discount_amt", DecimalType(7, 2)),
+      StructField("ss_ext_sales_price", DecimalType(7, 2)),
+      StructField("ss_ext_wholesale_cost", DecimalType(7, 2)),
+      StructField("ss_ext_list_price", DecimalType(7, 2)),
+      StructField("ss_ext_tax", DecimalType(7, 2)),
+      StructField("ss_coupon_amt", DecimalType(7, 2)),
+      StructField("ss_net_paid", DecimalType(7, 2)),
+      StructField("ss_net_paid_inc_tax", DecimalType(7, 2)),
+      StructField("ss_net_profit", DecimalType(7, 2))
+    ))
+
+  private val internSchemas: Seq[(String, StructType)] =
+    (for {
+      width <- Seq(10, 100, 1000)
+      nameLen <- Seq(1, 32)
+    } yield (s"w=$width n=$nameLen", schemaFixture(width, nameLen))) :+
+      ("tpcds-store_sales-23col" -> realisticSchema)
+
+  private def runInternEncode(label: String, schema: StructType): Unit = {
+    val N = 1L * 1000 * 1000
+    val intern = new SchemaJsonInternCache
+    val bench = new Benchmark(label, N, output = output)
+    bench.addCase("off (raw schema.json.getBytes per call)", 5) {
+      _ =>
+        var i = 0L
+        var checksum = 0L
+        while (i < N) {
+          val bytes = schema.json.getBytes(StandardCharsets.UTF_8)
+          checksum ^= bytes.length.toLong
+          i += 1
+        }
+        assert(checksum != Long.MinValue, s"checksum=$checksum")
+    }
+    bench.addCase("on  (intern.encodeBytes: cached canonical bytes)", 5) {
+      _ =>
+        var i = 0L
+        var checksum = 0L
+        while (i < N) {
+          val bytes = intern.encodeBytes(schema)
+          checksum ^= bytes.length.toLong
+          i += 1
+        }
+        assert(checksum != Long.MinValue, s"checksum=$checksum")
+    }
+    bench.run()
+  }
+
+  private def runInternDecode(label: String, schema: StructType): Unit = {
+    val N = 1L * 100 * 1000
+    val intern = new SchemaJsonInternCache
+    val jsonBytes = schema.json.getBytes(StandardCharsets.UTF_8)
+    val bench = new Benchmark(label, N, output = output)
+    bench.addCase("off (raw DataType.fromJson per call)", 5) {
+      _ =>
+        var i = 0L
+        var checksum = 0L
+        while (i < N) {
+          val s = DataType
+            .fromJson(new String(jsonBytes, StandardCharsets.UTF_8))
+            .asInstanceOf[StructType]
+          checksum ^= s.length.toLong
+          i += 1
+        }
+        assert(checksum != Long.MinValue, s"checksum=$checksum")
+    }
+    bench.addCase("on  (intern.decodeStructType: cached canonical 
StructType)", 5) {
+      _ =>
+        var i = 0L
+        var checksum = 0L
+        while (i < N) {
+          val s = intern.decodeStructType(jsonBytes)
+          checksum ^= s.length.toLong
+          i += 1
+        }
+        assert(checksum != Long.MinValue, s"checksum=$checksum")
+    }
+    bench.run()
+  }
+
+  // Working-set sweep across three regimes around cap = 256:
+  //   C1 == cap     -> 100% hit steady state
+  //   C2 == 2 x cap -> eviction pressure, partial hit
+  //   C3 == 4 x cap -> worst-case round-robin, ~all miss
+  // Manual interpretation guidance when reading results: expect C1 on >= off; 
C2 on within
+  // ~1.5x of off; C3 documented as known regression. The benchmark itself 
does not enforce
+  // any of these.

Review Comment:
   Adopted in 0f2e4d9054 — rewrote the working-set sweep prose to describe 
regimes only (C1/C2/C3 in terms of distinct-schema count vs cap) and point 
readers to the committed `-results.txt` for actual numbers. The prior "C3 known 
regression" claim was contradicted by the data on this PR (`-results.txt` shows 
C3 on=24.0× off), so the prediction is removed in favor of letting the file 
speak for itself.



##########
backends-velox/src/test/scala/org/apache/spark/sql/execution/SchemaJsonInternCacheSuite.scala:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.util.Random
+
+/**
+ * Invariants for [[SchemaJsonInternCache]]: (1) determinism -- equal inputs 
yield byte-identical /
+ * canonical-instance outputs; (2) capacity -- LRU cap = 256, eviction never 
corrupts later results;
+ * (3) concurrency -- contended get-or-compute yields correct results without 
exception.
+ */
+class SchemaJsonInternCacheSuite extends SparkFunSuite {
+
+  private def schemaOfWidth(n: Int): StructType =
+    StructType((0 until n).map(i => StructField(s"c$i", LongType, nullable = 
true)))
+
+  // === Invariant 1: determinism ===
+
+  test("encode is deterministic: same StructType => byte-identical output") {
+    val intern = new SchemaJsonInternCache
+    val s = schemaOfWidth(10)
+    val a = intern.encodeBytes(s)
+    val b = intern.encodeBytes(s)
+    assert(a.sameElements(b), "encodeBytes must be deterministic for equal 
inputs")
+    // intern is a memoizer, not a transformer
+    val raw = s.json.getBytes(StandardCharsets.UTF_8)
+    assert(a.sameElements(raw), "encodeBytes(s) must equal 
s.json.getBytes(UTF_8)")
+  }
+
+  test("decode is deterministic: same bytes => structurally-equal StructType") 
{
+    val intern = new SchemaJsonInternCache
+    val s = StructType(Seq(
+      StructField("a", IntegerType),
+      StructField("b", StringType),
+      StructField("c", LongType, nullable = false)))
+    val bytes = s.json.getBytes(StandardCharsets.UTF_8)
+    val d1 = intern.decodeStructType(bytes)
+    val d2 = intern.decodeStructType(bytes)
+    assert(d1 == s)
+    assert(d2 == s)
+    // canonical-instance contract: equal bytes => same instance (saves 
repeated parse cost)
+    assert(d1.eq(d2), "decodeStructType must return the same canonical 
instance for equal bytes")
+  }
+
+  test("encode canonicality: same StructType returns the same byte array 
instance") {
+    val intern = new SchemaJsonInternCache
+    val s = schemaOfWidth(5)
+    val a = intern.encodeBytes(s)
+    val b = intern.encodeBytes(s)
+    assert(a.eq(b), "encodeBytes must return the same canonical byte array for 
equal inputs")
+  }
+
+  // === Invariant 2: capacity ===
+
+  test("cap = 256 entries: eviction past cap does not corrupt later results") {
+    val intern = new SchemaJsonInternCache
+    val cap = 256
+    val total = cap * 4 // 1024 distinct schemas, forces ~75% miss rate
+    val schemas = (0 until total).map(i => schemaOfWidth(8 + (i % 16)))
+    schemas.foreach(intern.encodeBytes)
+    schemas.zipWithIndex.foreach {
+      case (s, i) =>
+        val cached = intern.encodeBytes(s)
+        val raw = s.json.getBytes(StandardCharsets.UTF_8)
+        assert(
+          cached.sameElements(raw),
+          s"entry $i (width=${s.length}) was corrupted across eviction cycles")
+    }
+  }
+
+  test("decode under cap pressure: >= cap distinct bytes still all decode 
correctly") {
+    val intern = new SchemaJsonInternCache
+    val cap = 256
+    val distinct = cap * 4
+    val pairs = (0 until distinct).map {
+      i =>
+        val s = schemaOfWidth(8 + (i % 16))
+        (s, s.json.getBytes(StandardCharsets.UTF_8))
+    }
+    // walk twice -- second walk hits a mix of evicted and live entries
+    pairs.foreach { case (_, bytes) => intern.decodeStructType(bytes) }
+    pairs.foreach {
+      case (s, bytes) =>
+        val decoded = intern.decodeStructType(bytes)
+        assert(decoded == s, s"decoded != expected for width=${s.length}")
+    }
+  }
+
+  // === Invariant 3: concurrency ===
+
+  test("concurrent get-or-compute: N threads on overlapping keys yields 
correct results") {
+    val intern = new SchemaJsonInternCache
+    val threads = 8
+    val keysPerThread = 200
+    val sharedKeySpace = 64 // overlap forces contention on same cache slots
+    val schemas = (0 until sharedKeySpace).map(i => schemaOfWidth(8 + (i % 
12)))
+
+    val pool = Executors.newFixedThreadPool(threads)
+    val start = new CountDownLatch(1)
+    val errors = new AtomicInteger(0)
+    val random = new Random(42)
+
+    val futures = (0 until threads).map {
+      tid =>
+        val rnd = new Random(random.nextLong())
+        pool.submit(new Runnable {
+          override def run(): Unit = {
+            start.await()
+            var i = 0
+            while (i < keysPerThread) {
+              val s = schemas(rnd.nextInt(sharedKeySpace))
+              try {
+                val enc = intern.encodeBytes(s)
+                val raw = s.json.getBytes(StandardCharsets.UTF_8)
+                if (!enc.sameElements(raw)) errors.incrementAndGet()
+
+                val dec = intern.decodeStructType(raw)
+                if (dec != s) errors.incrementAndGet()
+              } catch {
+                case _: Throwable => errors.incrementAndGet()
+              }
+              i += 1
+            }
+          }
+        })
+    }
+    start.countDown()
+    futures.foreach(_.get(60, TimeUnit.SECONDS))
+    pool.shutdown()
+    assert(pool.awaitTermination(10, TimeUnit.SECONDS), "thread pool did not 
terminate")
+    assert(
+      errors.get() == 0,
+      s"${errors.get()} concurrent get-or-compute errors out of ${threads * 
keysPerThread} ops")
+  }

Review Comment:
   Already addressed in 8470627513 — `SchemaJsonInternCacheSuite.scala` 
L149-156 wraps `futures.foreach(_.get(...))` in try/finally so 
`pool.shutdown()` + `awaitTermination(...)` always run. Looks like this 
re-review was generated against a stale snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to