Copilot commented on code in PR #12236: URL: https://github.com/apache/gluten/pull/12236#discussion_r3357400023
########## backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.sql.types.{DataType, StructType} + +import com.github.benmanes.caffeine.cache.{Cache, Caffeine} + +import java.nio.charset.StandardCharsets + +/** + * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch hot path. Best-effort + * Caffeine LRU; eviction recomputes via the same pure codec, so misses are indistinguishable from + * the no-cache baseline. Thread-safety via Caffeine `get(key, mappingFunction)`. + */ Review Comment: The scaladoc says this is an "LRU" cache, but `Caffeine.newBuilder.maximumSize(...)` uses Caffeine's size-bounded eviction policy (W-TinyLFU), not strict LRU. This wording could mislead readers about eviction behavior; consider describing it as a size-bounded cache instead. ########## backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.sql.types.{DataType, StructType} + +import com.github.benmanes.caffeine.cache.{Cache, Caffeine} + +import java.nio.charset.StandardCharsets + +/** + * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch hot path. Best-effort + * Caffeine LRU; eviction recomputes via the same pure codec, so misses are indistinguishable from + * the no-cache baseline. Thread-safety via Caffeine `get(key, mappingFunction)`. + */ +final private[execution] class SchemaJsonInternCache { + import SchemaJsonInternCache._ + + private val encodeCache: Cache[StructType, Array[Byte]] = + Caffeine.newBuilder.maximumSize(CAP).build[StructType, Array[Byte]]() + + private val decodeCache: Cache[String, StructType] = + Caffeine.newBuilder.maximumSize(CAP).build[String, StructType]() + + /** Returns the canonical UTF-8 JSON byte form of `schema`. */ + def encodeBytes(schema: StructType): Array[Byte] = + encodeCache.get(schema, k => k.json.getBytes(StandardCharsets.UTF_8)) Review Comment: `encodeBytes` returns the cached backing `Array[Byte]`. Because arrays are mutable, any accidental in-place modification by a caller would corrupt the cache and subsequent serializations. At minimum, document that the returned array must be treated as immutable (or return a defensive copy if you need stronger safety). ########## backends-velox/src/test/scala/org/apache/spark/sql/execution/SchemaJsonInternCacheSuite.scala: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} + +import java.nio.charset.StandardCharsets +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.util.Random + +/** + * Invariants for [[SchemaJsonInternCache]]: (1) determinism -- equal inputs yield byte-identical / + * canonical-instance outputs; (2) capacity -- LRU cap = 256, eviction never corrupts later results; + * (3) concurrency -- contended get-or-compute yields correct results without exception. + */ +class SchemaJsonInternCacheSuite extends SparkFunSuite { + + private def schemaOfWidth(n: Int): StructType = + StructType((0 until n).map(i => StructField(s"c$i", LongType, nullable = true))) + + // === Invariant 1: determinism === + + test("encode is deterministic: same StructType => byte-identical output") { + val intern = new SchemaJsonInternCache + val s = schemaOfWidth(10) + val a = intern.encodeBytes(s) + val b = intern.encodeBytes(s) + assert(a.sameElements(b), "encodeBytes must be deterministic for equal inputs") + // intern is a memoizer, not a transformer + val raw = s.json.getBytes(StandardCharsets.UTF_8) + assert(a.sameElements(raw), "encodeBytes(s) must equal s.json.getBytes(UTF_8)") + } + + test("decode is deterministic: same bytes => structurally-equal StructType") { + val intern = new SchemaJsonInternCache + val s = StructType(Seq( + StructField("a", IntegerType), + StructField("b", StringType), + StructField("c", LongType, nullable = false))) + val bytes = s.json.getBytes(StandardCharsets.UTF_8) + val d1 = intern.decodeStructType(bytes) + val d2 = intern.decodeStructType(bytes) + assert(d1 == s) + assert(d2 == s) + // canonical-instance contract: equal bytes => same instance (saves repeated parse cost) + assert(d1.eq(d2), "decodeStructType must return the same canonical instance for equal bytes") + } + + test("encode canonicality: same StructType returns the same byte array instance") { + val intern = new SchemaJsonInternCache + val s = schemaOfWidth(5) + val a = intern.encodeBytes(s) + val b = intern.encodeBytes(s) + assert(a.eq(b), "encodeBytes must return the same canonical byte array for equal inputs") + } + + // === Invariant 2: capacity === + + test("cap = 256 entries: eviction past cap does not corrupt later results") { + val intern = new SchemaJsonInternCache + val cap = 256 + val total = cap * 4 // 1024 distinct schemas, forces ~75% miss rate + val schemas = (0 until total).map(i => schemaOfWidth(8 + (i % 16))) + schemas.foreach(intern.encodeBytes) + schemas.zipWithIndex.foreach { + case (s, i) => + val cached = intern.encodeBytes(s) + val raw = s.json.getBytes(StandardCharsets.UTF_8) + assert( + cached.sameElements(raw), + s"entry $i (width=${s.length}) was corrupted across eviction cycles") + } + } + + test("decode under cap pressure: >= cap distinct bytes still all decode correctly") { + val intern = new SchemaJsonInternCache + val cap = 256 + val distinct = cap * 4 + val pairs = (0 until distinct).map { + i => + val s = schemaOfWidth(8 + (i % 16)) + (s, s.json.getBytes(StandardCharsets.UTF_8)) + } + // walk twice -- second walk hits a mix of evicted and live entries + pairs.foreach { case (_, bytes) => intern.decodeStructType(bytes) } + pairs.foreach { + case (s, bytes) => + val decoded = intern.decodeStructType(bytes) + assert(decoded == s, s"decoded != expected for width=${s.length}") + } + } + + // === Invariant 3: concurrency === + + test("concurrent get-or-compute: N threads on overlapping keys yields correct results") { + val intern = new SchemaJsonInternCache + val threads = 8 + val keysPerThread = 200 + val sharedKeySpace = 64 // overlap forces contention on same cache slots + val schemas = (0 until sharedKeySpace).map(i => schemaOfWidth(8 + (i % 12))) + + val pool = Executors.newFixedThreadPool(threads) + val start = new CountDownLatch(1) + val errors = new AtomicInteger(0) + val random = new Random(42) + + val futures = (0 until threads).map { + tid => + val rnd = new Random(random.nextLong()) + pool.submit(new Runnable { + override def run(): Unit = { + start.await() + var i = 0 + while (i < keysPerThread) { + val s = schemas(rnd.nextInt(sharedKeySpace)) + try { + val enc = intern.encodeBytes(s) + val raw = s.json.getBytes(StandardCharsets.UTF_8) + if (!enc.sameElements(raw)) errors.incrementAndGet() + + val dec = intern.decodeStructType(raw) + if (dec != s) errors.incrementAndGet() + } catch { + case _: Throwable => errors.incrementAndGet() + } + i += 1 + } + } + }) + } + start.countDown() + futures.foreach(_.get(60, TimeUnit.SECONDS)) + pool.shutdown() + assert(pool.awaitTermination(10, TimeUnit.SECONDS), "thread pool did not terminate") + assert( + errors.get() == 0, + s"${errors.get()} concurrent get-or-compute errors out of ${threads * keysPerThread} ops") + } Review Comment: If `futures.foreach(_.get(...))` throws (timeout / execution error), the thread pool won't be shut down, which can leak non-daemon threads and hang the rest of the test suite/CI JVM. Use a try/finally to ensure `shutdown()` / `awaitTermination()` always run. ########## backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCachePartitionStatsBenchmark.scala: ########## @@ -59,6 +64,162 @@ object ColumnarTableCachePartitionStatsBenchmark extends SqlBasedBenchmark { } } + // ============================================================================ + // Schema-codec intern microbench (SchemaJsonInternCache). + // + // ColumnarCachedBatchSerializer hot paths call StructType.json on every batch + // write and DataType.fromJson on every batch read. The intern cache memoizes + // the round-trip without changing the wire format. Sections below compare two + // distinct method calls in the same JVM as cache off (raw codec) vs cache on + // (intern memoized round-trip), with no toggle on the cache class itself. + // ============================================================================ + + private val INTERN_CAP = 256 Review Comment: `INTERN_CAP` duplicates the cache capacity constant. If `SchemaJsonInternCache`'s CAP ever changes, this benchmark will silently go out of sync. Consider referencing `SchemaJsonInternCache.CAP` (after making it `private[execution]`) instead of hard-coding 256 here. ########## benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt: ########## @@ -1,28 +1,162 @@ -OpenJDK 64-Bit Server VM 17.0.18+8-Ubuntu-124.04.1 on Linux 6.6.87.2-microsoft-standard-WSL2 +OpenJDK 64-Bit Server VM 17.0.18+8 on Linux 6.6.87.2-microsoft-standard-WSL2 AMD EPYC 7763 64-Core Processor table cache build: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -partitionStats off 126425 138565 10546 0.8 1264.3 1.0X -partitionStats on 131431 137094 7581 0.8 1314.3 1.0X +partitionStats off 127107 127856 668 0.8 1271.1 1.0X +partitionStats on 134398 146067 10193 0.7 1344.0 0.9X Review Comment: This benchmark-results file now has leading line numbers and `|` separators on every line. If the intent is to check in raw Spark `Benchmark` output for easy diffing, these prefixes make comparisons noisier and suggest the content may have been passed through a formatter like `nl`. Consider regenerating/committing the plain output format if available. ########## backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.sql.types.{DataType, StructType} + +import com.github.benmanes.caffeine.cache.{Cache, Caffeine} + +import java.nio.charset.StandardCharsets + +/** + * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch hot path. Best-effort + * Caffeine LRU; eviction recomputes via the same pure codec, so misses are indistinguishable from + * the no-cache baseline. Thread-safety via Caffeine `get(key, mappingFunction)`. + */ +final private[execution] class SchemaJsonInternCache { + import SchemaJsonInternCache._ + + private val encodeCache: Cache[StructType, Array[Byte]] = + Caffeine.newBuilder.maximumSize(CAP).build[StructType, Array[Byte]]() + + private val decodeCache: Cache[String, StructType] = + Caffeine.newBuilder.maximumSize(CAP).build[String, StructType]() + + /** Returns the canonical UTF-8 JSON byte form of `schema`. */ + def encodeBytes(schema: StructType): Array[Byte] = + encodeCache.get(schema, k => k.json.getBytes(StandardCharsets.UTF_8)) + + /** Returns the canonical [[StructType]] parsed from `bytes` (UTF-8 JSON). */ + def decodeStructType(bytes: Array[Byte]): StructType = { + val key = new String(bytes, StandardCharsets.UTF_8) + decodeCache.get(key, k => DataType.fromJson(k).asInstanceOf[StructType]) + } +} + +private[execution] object SchemaJsonInternCache { + // 256 entries: <= ~8.5 MB retained even at 1000-field schemas (~33 KB JSON each). Verified by + // Section C working-set sweep of the FU-D7 bench harness; revisit if C1/C2 gates fail. + private val CAP = 256L Review Comment: The CAP comment references an undefined "FU-D7" harness and the memory estimate appears to only account for one side of the cache. Since both encode (bytes) and decode (String key + StructType) caches retain data, the bound is higher than ~8.5 MB in the worst case. Also, hard-coding `256` across tests/benchmarks is easier to keep consistent if `CAP` is package-visible. ########## backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCachePartitionStatsBenchmark.scala: ########## @@ -59,6 +64,162 @@ object ColumnarTableCachePartitionStatsBenchmark extends SqlBasedBenchmark { } } + // ============================================================================ + // Schema-codec intern microbench (SchemaJsonInternCache). + // + // ColumnarCachedBatchSerializer hot paths call StructType.json on every batch + // write and DataType.fromJson on every batch read. The intern cache memoizes + // the round-trip without changing the wire format. Sections below compare two + // distinct method calls in the same JVM as cache off (raw codec) vs cache on + // (intern memoized round-trip), with no toggle on the cache class itself. + // ============================================================================ + + private val INTERN_CAP = 256 + + private def schemaFixture(numCols: Int, nameLen: Int): StructType = { + val name = "c" + ("x" * math.max(0, nameLen - 1)) + StructType( + (0 until numCols).map(i => StructField(s"$name$i", LongType, nullable = true))) + } + + // TPC-DS store_sales-derived 23-col mixed-type fixture; realistic name shape. + private def realisticSchema: StructType = StructType( + Seq( + StructField("ss_sold_date_sk", IntegerType), + StructField("ss_sold_time_sk", IntegerType), + StructField("ss_item_sk", IntegerType), + StructField("ss_customer_sk", IntegerType), + StructField("ss_cdemo_sk", IntegerType), + StructField("ss_hdemo_sk", IntegerType), + StructField("ss_addr_sk", IntegerType), + StructField("ss_store_sk", IntegerType), + StructField("ss_promo_sk", IntegerType), + StructField("ss_ticket_number", LongType), + StructField("ss_quantity", IntegerType), + StructField("ss_wholesale_cost", DecimalType(7, 2)), + StructField("ss_list_price", DecimalType(7, 2)), + StructField("ss_sales_price", DecimalType(7, 2)), + StructField("ss_ext_discount_amt", DecimalType(7, 2)), + StructField("ss_ext_sales_price", DecimalType(7, 2)), + StructField("ss_ext_wholesale_cost", DecimalType(7, 2)), + StructField("ss_ext_list_price", DecimalType(7, 2)), + StructField("ss_ext_tax", DecimalType(7, 2)), + StructField("ss_coupon_amt", DecimalType(7, 2)), + StructField("ss_net_paid", DecimalType(7, 2)), + StructField("ss_net_paid_inc_tax", DecimalType(7, 2)), + StructField("ss_net_profit", DecimalType(7, 2)) + )) + + private val internSchemas: Seq[(String, StructType)] = + (for { + width <- Seq(10, 100, 1000) + nameLen <- Seq(1, 32) + } yield (s"w=$width n=$nameLen", schemaFixture(width, nameLen))) :+ + ("tpcds-store_sales-23col" -> realisticSchema) + + private def runInternEncode(label: String, schema: StructType): Unit = { + val N = 1L * 1000 * 1000 + val intern = new SchemaJsonInternCache + val bench = new Benchmark(label, N, output = output) + bench.addCase("off (raw schema.json.getBytes per call)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val bytes = schema.json.getBytes(StandardCharsets.UTF_8) + checksum ^= bytes.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.addCase("on (intern.encodeBytes: cached canonical bytes)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val bytes = intern.encodeBytes(schema) + checksum ^= bytes.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.run() + } + + private def runInternDecode(label: String, schema: StructType): Unit = { + val N = 1L * 100 * 1000 + val intern = new SchemaJsonInternCache + val jsonBytes = schema.json.getBytes(StandardCharsets.UTF_8) + val bench = new Benchmark(label, N, output = output) + bench.addCase("off (raw DataType.fromJson per call)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val s = DataType + .fromJson(new String(jsonBytes, StandardCharsets.UTF_8)) + .asInstanceOf[StructType] + checksum ^= s.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.addCase("on (intern.decodeStructType: cached canonical StructType)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val s = intern.decodeStructType(jsonBytes) + checksum ^= s.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.run() + } + + // Working-set sweep across three regimes around cap = 256: + // C1 == cap -> 100% hit steady state + // C2 == 2 x cap -> eviction pressure, partial hit + // C3 == 4 x cap -> worst-case round-robin, ~all miss + // Gates (read at results-read time): + // C1 on must be >= off; C2 on within 1.5x of off; C3 documented as known regression. Review Comment: These lines describe performance "gates", but the benchmark doesn't actually enforce any thresholds—it's just emitting numbers. Consider rewording to make it clear this is a manual interpretation guideline (or remove the gating language). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
