Copilot commented on code in PR #12236: URL: https://github.com/apache/gluten/pull/12236#discussion_r3360559233
########## backends-velox/src/test/scala/org/apache/spark/sql/execution/SchemaJsonInternCacheSuite.scala: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} + +import java.nio.charset.StandardCharsets +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.util.Random + +/** + * Invariants for [[SchemaJsonInternCache]]: (1) determinism -- equal inputs yield byte-identical / + * canonical-instance outputs; (2) capacity -- LRU cap = 256, eviction never corrupts later results; + * (3) concurrency -- contended get-or-compute yields correct results without exception. Review Comment: The suite comment says the cache is an "LRU" with cap=256, but SchemaJsonInternCache uses Caffeine’s default eviction policy (W-TinyLFU), not strict LRU. This is misleading for readers and for future maintenance of the tests. ########## backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCachePartitionStatsBenchmark.scala: ########## @@ -59,6 +64,163 @@ object ColumnarTableCachePartitionStatsBenchmark extends SqlBasedBenchmark { } } + // ============================================================================ + // Schema-codec intern microbench (SchemaJsonInternCache). + // + // ColumnarCachedBatchSerializer hot paths call StructType.json on every batch + // write and DataType.fromJson on every batch read. The intern cache memoizes + // the round-trip without changing the wire format. Sections below compare two + // distinct method calls in the same JVM as cache off (raw codec) vs cache on + // (intern memoized round-trip), with no toggle on the cache class itself. + // ============================================================================ + + private val INTERN_CAP = SchemaJsonInternCache.CAP.toInt + + private def schemaFixture(numCols: Int, nameLen: Int): StructType = { + val name = "c" + ("x" * math.max(0, nameLen - 1)) + StructType( + (0 until numCols).map(i => StructField(s"$name$i", LongType, nullable = true))) + } + + // TPC-DS store_sales-derived 23-col mixed-type fixture; realistic name shape. + private def realisticSchema: StructType = StructType( + Seq( + StructField("ss_sold_date_sk", IntegerType), + StructField("ss_sold_time_sk", IntegerType), + StructField("ss_item_sk", IntegerType), + StructField("ss_customer_sk", IntegerType), + StructField("ss_cdemo_sk", IntegerType), + StructField("ss_hdemo_sk", IntegerType), + StructField("ss_addr_sk", IntegerType), + StructField("ss_store_sk", IntegerType), + StructField("ss_promo_sk", IntegerType), + StructField("ss_ticket_number", LongType), + StructField("ss_quantity", IntegerType), + StructField("ss_wholesale_cost", DecimalType(7, 2)), + StructField("ss_list_price", DecimalType(7, 2)), + StructField("ss_sales_price", DecimalType(7, 2)), + StructField("ss_ext_discount_amt", DecimalType(7, 2)), + StructField("ss_ext_sales_price", DecimalType(7, 2)), + StructField("ss_ext_wholesale_cost", DecimalType(7, 2)), + StructField("ss_ext_list_price", DecimalType(7, 2)), + StructField("ss_ext_tax", DecimalType(7, 2)), + StructField("ss_coupon_amt", DecimalType(7, 2)), + StructField("ss_net_paid", DecimalType(7, 2)), + StructField("ss_net_paid_inc_tax", DecimalType(7, 2)), + StructField("ss_net_profit", DecimalType(7, 2)) + )) + + private val internSchemas: Seq[(String, StructType)] = + (for { + width <- Seq(10, 100, 1000) + nameLen <- Seq(1, 32) + } yield (s"w=$width n=$nameLen", schemaFixture(width, nameLen))) :+ + ("tpcds-store_sales-23col" -> realisticSchema) + + private def runInternEncode(label: String, schema: StructType): Unit = { + val N = 1L * 1000 * 1000 + val intern = new SchemaJsonInternCache + val bench = new Benchmark(label, N, output = output) + bench.addCase("off (raw schema.json.getBytes per call)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val bytes = schema.json.getBytes(StandardCharsets.UTF_8) + checksum ^= bytes.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.addCase("on (intern.encodeBytes: cached canonical bytes)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val bytes = intern.encodeBytes(schema) + checksum ^= bytes.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.run() + } + + private def runInternDecode(label: String, schema: StructType): Unit = { + val N = 1L * 100 * 1000 + val intern = new SchemaJsonInternCache + val jsonBytes = schema.json.getBytes(StandardCharsets.UTF_8) + val bench = new Benchmark(label, N, output = output) + bench.addCase("off (raw DataType.fromJson per call)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val s = DataType + .fromJson(new String(jsonBytes, StandardCharsets.UTF_8)) + .asInstanceOf[StructType] + checksum ^= s.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.addCase("on (intern.decodeStructType: cached canonical StructType)", 5) { + _ => + var i = 0L + var checksum = 0L + while (i < N) { + val s = intern.decodeStructType(jsonBytes) + checksum ^= s.length.toLong + i += 1 + } + assert(checksum != Long.MinValue, s"checksum=$checksum") + } + bench.run() + } + + // Working-set sweep across three regimes around cap = 256: + // C1 == cap -> 100% hit steady state + // C2 == 2 x cap -> eviction pressure, partial hit + // C3 == 4 x cap -> worst-case round-robin, ~all miss + // Manual interpretation guidance when reading results: expect C1 on >= off; C2 on within + // ~1.5x of off; C3 documented as known regression. The benchmark itself does not enforce + // any of these. Review Comment: The working-set sweep comment hard-codes performance expectations (including calling C3 a "known regression"), but the committed benchmark results for this PR show the "on" path faster even under churn. This guidance is likely to confuse readers; it’s safer to describe what the cases represent without asserting expected outcomes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
