yaooqinn commented on code in PR #12236: URL: https://github.com/apache/gluten/pull/12236#discussion_r3366449098
########## backends-velox/src/test/scala/org/apache/spark/sql/execution/SchemaJsonInternCacheSuite.scala: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} + +import java.nio.charset.StandardCharsets +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.util.Random + +/** + * Invariants for [[SchemaJsonInternCache]]: (1) determinism -- equal inputs yield byte-identical / + * canonical-instance outputs; (2) capacity -- LRU cap = 256, eviction never corrupts later results; + * (3) concurrency -- contended get-or-compute yields correct results without exception. Review Comment: Adopted in 0f2e4d9054 — suite scaladoc updated to "size-bounded cap = 256". (The production class scaladoc was already fixed in 8470627513; this was a missed second occurrence in the test file.) ########## backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.sql.types.{DataType, StructType} + +import com.github.benmanes.caffeine.cache.{Cache, Caffeine} + +import java.nio.charset.StandardCharsets + +/** + * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch hot path. Best-effort + * Caffeine LRU; eviction recomputes via the same pure codec, so misses are indistinguishable from + * the no-cache baseline. Thread-safety via Caffeine `get(key, mappingFunction)`. + */ Review Comment: Already addressed in 8470627513 — `SchemaJsonInternCache.scala` L26-29 now reads "size-bounded Caffeine cache (W-TinyLFU); eviction recomputes via the same pure codec, so misses are indistinguishable from the no-cache baseline." Looks like this re-review was generated against a stale snapshot. ########## benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt: ########## @@ -1,28 +1,162 @@ -OpenJDK 64-Bit Server VM 17.0.18+8-Ubuntu-124.04.1 on Linux 6.6.87.2-microsoft-standard-WSL2 +OpenJDK 64-Bit Server VM 17.0.18+8 on Linux 6.6.87.2-microsoft-standard-WSL2 AMD EPYC 7763 64-Core Processor table cache build: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -partitionStats off 126425 138565 10546 0.8 1264.3 1.0X -partitionStats on 131431 137094 7581 0.8 1314.3 1.0X +partitionStats off 127107 127856 668 0.8 1271.1 1.0X +partitionStats on 134398 146067 10193 0.7 1344.0 0.9X Review Comment: This is still a false positive — the file does not have any `<N>|` prefix. `xxd benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt | head -1` confirms byte 0 = 'O' (start of "OpenJDK 64-Bit Server VM"). The leading numbers visible in the GitHub diff viewer are the diff line-number gutter, not file content. ########## backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCachePartitionStatsBenchmark.scala: ########## @@ -59,6 +64,163 @@ object ColumnarTableCachePartitionStatsBenchmark extends SqlBasedBenchmark { } } + // ============================================================================ + // Schema-codec intern microbench (SchemaJsonInternCache). + // + // ColumnarCachedBatchSerializer hot paths call StructType.json on every batch + // write and DataType.fromJson on every batch read. The intern cache memoizes + // the round-trip without changing the wire format. Sections below compare two + // distinct method calls in the same JVM as cache off (raw codec) vs cache on + // (intern memoized round-trip), with no toggle on the cache class itself. + // ============================================================================ + + private val INTERN_CAP = SchemaJsonInternCache.CAP.toInt + + private def schemaFixture(numCols: Int, nameLen: Int): StructType = { + val name = "c" + ("x" * math.max(0, nameLen - 1)) + StructType( + (0 until numCols).map(i => StructField(s"$name$i", LongType, nullable = true))) + } + + // TPC-DS store_sales-derived 23-col mixed-type fixture; realistic name shape. + private def realisticSchema: StructType = StructType( + Seq( + StructField("ss_sold_date_sk", IntegerType), + StructField("ss_sold_time_sk", IntegerType), + StructField("ss_item_sk", IntegerType), + StructField("ss_customer_sk", IntegerType), + StructField("ss_cdemo_sk", IntegerType), + StructField("ss_hdemo_sk", IntegerType), + StructField("ss_addr_sk", IntegerType), + StructField("ss_store_sk", IntegerType), + StructField("ss_promo_sk", IntegerType), + StructField("ss_ticket_number", LongType), + StructField("ss_quantity", IntegerType), + StructField("ss_wholesale_cost", DecimalType(7, 2)), + StructField("ss_list_price", DecimalType(7, 2)), + StructField("ss_sales_price", DecimalType(7, 2)), + StructField("ss_ext_discount_amt", DecimalType(7, 2)), + StructField("ss_ext_sales_price", DecimalType(7, 2)), + StructField("ss_ext_wholesale_cost", DecimalType(7, 2)), + StructField("ss_ext_list_price", DecimalType(7, 2)), + StructField("ss_ext_tax", DecimalType(7, 2)), + StructField("ss_coupon_amt", DecimalType(7, 2)), + StructField("ss_net_paid", DecimalType(7, 2)), + StructField("ss_net_paid_inc_tax", DecimalType(7, 2)), + StructField("ss_net_profit", DecimalType(7, 2)) + )) + + private val internSchemas: Seq[(String, StructType)] = + (for { + width <- Seq(10, 100, 1000) + nameLen <- Seq(1, 32) + } yield (s"w=$width n=$nameLen", schemaFixture(width, nameLen))) :+ + ("tpcds-store_sales-23col" -> realisticSchema) + + private def runInternEncode(label: String, schema: StructType): Unit = { + val N = 1L * 1000 * 1000 + val intern = new SchemaJsonInternCache + val bench = new Benchmark(label, N, output = output) + bench.addCase("off (raw schema.json.getBytes per call)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val bytes = schema.json.getBytes(StandardCharsets.UTF_8) + checksum ^= bytes.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.addCase("on (intern.encodeBytes: cached canonical bytes)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val bytes = intern.encodeBytes(schema) + checksum ^= bytes.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.run() + } + + private def runInternDecode(label: String, schema: StructType): Unit = { + val N = 1L * 100 * 1000 + val intern = new SchemaJsonInternCache + val jsonBytes = schema.json.getBytes(StandardCharsets.UTF_8) + val bench = new Benchmark(label, N, output = output) + bench.addCase("off (raw DataType.fromJson per call)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val s = DataType + .fromJson(new String(jsonBytes, StandardCharsets.UTF_8)) + .asInstanceOf[StructType] + checksum ^= s.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.addCase("on (intern.decodeStructType: cached canonical StructType)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val s = intern.decodeStructType(jsonBytes) + checksum ^= s.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.run() + } + + // Working-set sweep across three regimes around cap = 256: + // C1 == cap -> 100% hit steady state + // C2 == 2 x cap -> eviction pressure, partial hit + // C3 == 4 x cap -> worst-case round-robin, ~all miss + // Manual interpretation guidance when reading results: expect C1 on >= off; C2 on within + // ~1.5x of off; C3 documented as known regression. The benchmark itself does not enforce + // any of these. Review Comment: Adopted in 0f2e4d9054 — rewrote the working-set sweep prose to describe regimes only (C1/C2/C3 in terms of distinct-schema count vs cap) and point readers to the committed `-results.txt` for actual numbers. The prior "C3 known regression" claim was contradicted by the data on this PR (`-results.txt` shows C3 on=24.0× off), so the prediction is removed in favor of letting the file speak for itself. ########## backends-velox/src/test/scala/org/apache/spark/sql/execution/SchemaJsonInternCacheSuite.scala: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} + +import java.nio.charset.StandardCharsets +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.util.Random + +/** + * Invariants for [[SchemaJsonInternCache]]: (1) determinism -- equal inputs yield byte-identical / + * canonical-instance outputs; (2) capacity -- LRU cap = 256, eviction never corrupts later results; + * (3) concurrency -- contended get-or-compute yields correct results without exception. + */ +class SchemaJsonInternCacheSuite extends SparkFunSuite { + + private def schemaOfWidth(n: Int): StructType = + StructType((0 until n).map(i => StructField(s"c$i", LongType, nullable = true))) + + // === Invariant 1: determinism === + + test("encode is deterministic: same StructType => byte-identical output") { + val intern = new SchemaJsonInternCache + val s = schemaOfWidth(10) + val a = intern.encodeBytes(s) + val b = intern.encodeBytes(s) + assert(a.sameElements(b), "encodeBytes must be deterministic for equal inputs") + // intern is a memoizer, not a transformer + val raw = s.json.getBytes(StandardCharsets.UTF_8) + assert(a.sameElements(raw), "encodeBytes(s) must equal s.json.getBytes(UTF_8)") + } + + test("decode is deterministic: same bytes => structurally-equal StructType") { + val intern = new SchemaJsonInternCache + val s = StructType(Seq( + StructField("a", IntegerType), + StructField("b", StringType), + StructField("c", LongType, nullable = false))) + val bytes = s.json.getBytes(StandardCharsets.UTF_8) + val d1 = intern.decodeStructType(bytes) + val d2 = intern.decodeStructType(bytes) + assert(d1 == s) + assert(d2 == s) + // canonical-instance contract: equal bytes => same instance (saves repeated parse cost) + assert(d1.eq(d2), "decodeStructType must return the same canonical instance for equal bytes") + } + + test("encode canonicality: same StructType returns the same byte array instance") { + val intern = new SchemaJsonInternCache + val s = schemaOfWidth(5) + val a = intern.encodeBytes(s) + val b = intern.encodeBytes(s) + assert(a.eq(b), "encodeBytes must return the same canonical byte array for equal inputs") + } + + // === Invariant 2: capacity === + + test("cap = 256 entries: eviction past cap does not corrupt later results") { + val intern = new SchemaJsonInternCache + val cap = 256 + val total = cap * 4 // 1024 distinct schemas, forces ~75% miss rate + val schemas = (0 until total).map(i => schemaOfWidth(8 + (i % 16))) + schemas.foreach(intern.encodeBytes) + schemas.zipWithIndex.foreach { + case (s, i) => + val cached = intern.encodeBytes(s) + val raw = s.json.getBytes(StandardCharsets.UTF_8) + assert( + cached.sameElements(raw), + s"entry $i (width=${s.length}) was corrupted across eviction cycles") + } + } + + test("decode under cap pressure: >= cap distinct bytes still all decode correctly") { + val intern = new SchemaJsonInternCache + val cap = 256 + val distinct = cap * 4 + val pairs = (0 until distinct).map { + i => + val s = schemaOfWidth(8 + (i % 16)) + (s, s.json.getBytes(StandardCharsets.UTF_8)) + } + // walk twice -- second walk hits a mix of evicted and live entries + pairs.foreach { case (_, bytes) => intern.decodeStructType(bytes) } + pairs.foreach { + case (s, bytes) => + val decoded = intern.decodeStructType(bytes) + assert(decoded == s, s"decoded != expected for width=${s.length}") + } + } + + // === Invariant 3: concurrency === + + test("concurrent get-or-compute: N threads on overlapping keys yields correct results") { + val intern = new SchemaJsonInternCache + val threads = 8 + val keysPerThread = 200 + val sharedKeySpace = 64 // overlap forces contention on same cache slots + val schemas = (0 until sharedKeySpace).map(i => schemaOfWidth(8 + (i % 12))) + + val pool = Executors.newFixedThreadPool(threads) + val start = new CountDownLatch(1) + val errors = new AtomicInteger(0) + val random = new Random(42) + + val futures = (0 until threads).map { + tid => + val rnd = new Random(random.nextLong()) + pool.submit(new Runnable { + override def run(): Unit = { + start.await() + var i = 0 + while (i < keysPerThread) { + val s = schemas(rnd.nextInt(sharedKeySpace)) + try { + val enc = intern.encodeBytes(s) + val raw = s.json.getBytes(StandardCharsets.UTF_8) + if (!enc.sameElements(raw)) errors.incrementAndGet() + + val dec = intern.decodeStructType(raw) + if (dec != s) errors.incrementAndGet() + } catch { + case _: Throwable => errors.incrementAndGet() + } + i += 1 + } + } + }) + } + start.countDown() + futures.foreach(_.get(60, TimeUnit.SECONDS)) + pool.shutdown() + assert(pool.awaitTermination(10, TimeUnit.SECONDS), "thread pool did not terminate") + assert( + errors.get() == 0, + s"${errors.get()} concurrent get-or-compute errors out of ${threads * keysPerThread} ops") + } Review Comment: Already addressed in 8470627513 — `SchemaJsonInternCacheSuite.scala` L149-156 wraps `futures.foreach(_.get(...))` in try/finally so `pool.shutdown()` + `awaitTermination(...)` always run. Looks like this re-review was generated against a stale snapshot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
