yaooqinn commented on code in PR #12236:
URL: https://github.com/apache/gluten/pull/12236#discussion_r3360547990
##########
backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCachePartitionStatsBenchmark.scala:
##########
@@ -59,6 +64,162 @@ object ColumnarTableCachePartitionStatsBenchmark extends
SqlBasedBenchmark {
}
}
+ //
============================================================================
+ // Schema-codec intern microbench (SchemaJsonInternCache).
+ //
+ // ColumnarCachedBatchSerializer hot paths call StructType.json on every
batch
+ // write and DataType.fromJson on every batch read. The intern cache memoizes
+ // the round-trip without changing the wire format. Sections below compare
two
+ // distinct method calls in the same JVM as cache off (raw codec) vs cache on
+ // (intern memoized round-trip), with no toggle on the cache class itself.
+ //
============================================================================
+
+ private val INTERN_CAP = 256
+
+ private def schemaFixture(numCols: Int, nameLen: Int): StructType = {
+ val name = "c" + ("x" * math.max(0, nameLen - 1))
+ StructType(
+ (0 until numCols).map(i => StructField(s"$name$i", LongType, nullable =
true)))
+ }
+
+ // TPC-DS store_sales-derived 23-col mixed-type fixture; realistic name
shape.
+ private def realisticSchema: StructType = StructType(
+ Seq(
+ StructField("ss_sold_date_sk", IntegerType),
+ StructField("ss_sold_time_sk", IntegerType),
+ StructField("ss_item_sk", IntegerType),
+ StructField("ss_customer_sk", IntegerType),
+ StructField("ss_cdemo_sk", IntegerType),
+ StructField("ss_hdemo_sk", IntegerType),
+ StructField("ss_addr_sk", IntegerType),
+ StructField("ss_store_sk", IntegerType),
+ StructField("ss_promo_sk", IntegerType),
+ StructField("ss_ticket_number", LongType),
+ StructField("ss_quantity", IntegerType),
+ StructField("ss_wholesale_cost", DecimalType(7, 2)),
+ StructField("ss_list_price", DecimalType(7, 2)),
+ StructField("ss_sales_price", DecimalType(7, 2)),
+ StructField("ss_ext_discount_amt", DecimalType(7, 2)),
+ StructField("ss_ext_sales_price", DecimalType(7, 2)),
+ StructField("ss_ext_wholesale_cost", DecimalType(7, 2)),
+ StructField("ss_ext_list_price", DecimalType(7, 2)),
+ StructField("ss_ext_tax", DecimalType(7, 2)),
+ StructField("ss_coupon_amt", DecimalType(7, 2)),
+ StructField("ss_net_paid", DecimalType(7, 2)),
+ StructField("ss_net_paid_inc_tax", DecimalType(7, 2)),
+ StructField("ss_net_profit", DecimalType(7, 2))
+ ))
+
+ private val internSchemas: Seq[(String, StructType)] =
+ (for {
+ width <- Seq(10, 100, 1000)
+ nameLen <- Seq(1, 32)
+ } yield (s"w=$width n=$nameLen", schemaFixture(width, nameLen))) :+
+ ("tpcds-store_sales-23col" -> realisticSchema)
+
+ private def runInternEncode(label: String, schema: StructType): Unit = {
+ val N = 1L * 1000 * 1000
+ val intern = new SchemaJsonInternCache
+ val bench = new Benchmark(label, N, output = output)
+ bench.addCase("off (raw schema.json.getBytes per call)", 5) {
+ _ =>
+ var i = 0L
+ var checksum = 0L
+ while (i < N) {
+ val bytes = schema.json.getBytes(StandardCharsets.UTF_8)
+ checksum ^= bytes.length.toLong
+ i += 1
+ }
+ assert(checksum != Long.MinValue, s"checksum=$checksum")
+ }
+ bench.addCase("on (intern.encodeBytes: cached canonical bytes)", 5) {
+ _ =>
+ var i = 0L
+ var checksum = 0L
+ while (i < N) {
+ val bytes = intern.encodeBytes(schema)
+ checksum ^= bytes.length.toLong
+ i += 1
+ }
+ assert(checksum != Long.MinValue, s"checksum=$checksum")
+ }
+ bench.run()
+ }
+
+ private def runInternDecode(label: String, schema: StructType): Unit = {
+ val N = 1L * 100 * 1000
+ val intern = new SchemaJsonInternCache
+ val jsonBytes = schema.json.getBytes(StandardCharsets.UTF_8)
+ val bench = new Benchmark(label, N, output = output)
+ bench.addCase("off (raw DataType.fromJson per call)", 5) {
+ _ =>
+ var i = 0L
+ var checksum = 0L
+ while (i < N) {
+ val s = DataType
+ .fromJson(new String(jsonBytes, StandardCharsets.UTF_8))
+ .asInstanceOf[StructType]
+ checksum ^= s.length.toLong
+ i += 1
+ }
+ assert(checksum != Long.MinValue, s"checksum=$checksum")
+ }
+ bench.addCase("on (intern.decodeStructType: cached canonical
StructType)", 5) {
+ _ =>
+ var i = 0L
+ var checksum = 0L
+ while (i < N) {
+ val s = intern.decodeStructType(jsonBytes)
+ checksum ^= s.length.toLong
+ i += 1
+ }
+ assert(checksum != Long.MinValue, s"checksum=$checksum")
+ }
+ bench.run()
+ }
+
+ // Working-set sweep across three regimes around cap = 256:
+ // C1 == cap -> 100% hit steady state
+ // C2 == 2 x cap -> eviction pressure, partial hit
+ // C3 == 4 x cap -> worst-case round-robin, ~all miss
+ // Gates (read at results-read time):
+ // C1 on must be >= off; C2 on within 1.5x of off; C3 documented as known
regression.
Review Comment:
Adopted in 8470627513 — reworded to "Manual interpretation guidance when
reading results" with an explicit "the benchmark itself does not enforce any of
these" qualifier.
##########
backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/ColumnarTableCachePartitionStatsBenchmark.scala:
##########
@@ -59,6 +64,162 @@ object ColumnarTableCachePartitionStatsBenchmark extends
SqlBasedBenchmark {
}
}
+ //
============================================================================
+ // Schema-codec intern microbench (SchemaJsonInternCache).
+ //
+ // ColumnarCachedBatchSerializer hot paths call StructType.json on every
batch
+ // write and DataType.fromJson on every batch read. The intern cache memoizes
+ // the round-trip without changing the wire format. Sections below compare
two
+ // distinct method calls in the same JVM as cache off (raw codec) vs cache on
+ // (intern memoized round-trip), with no toggle on the cache class itself.
+ //
============================================================================
+
+ private val INTERN_CAP = 256
Review Comment:
Already auto-adopted by Copilot Autofix in a904bedf1d — `INTERN_CAP` now
references `SchemaJsonInternCache.CAP.toInt`.
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.types.{DataType, StructType}
+
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
+
+import java.nio.charset.StandardCharsets
+
+/**
+ * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch
hot path. Best-effort
+ * Caffeine LRU; eviction recomputes via the same pure codec, so misses are
indistinguishable from
+ * the no-cache baseline. Thread-safety via Caffeine `get(key,
mappingFunction)`.
+ */
Review Comment:
Adopted in 8470627513 — scaladoc rewritten to "size-bounded Caffeine cache
(W-TinyLFU)". Thanks for the precision.
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.types.{DataType, StructType}
+
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
+
+import java.nio.charset.StandardCharsets
+
+/**
+ * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch
hot path. Best-effort
+ * Caffeine LRU; eviction recomputes via the same pure codec, so misses are
indistinguishable from
+ * the no-cache baseline. Thread-safety via Caffeine `get(key,
mappingFunction)`.
+ */
+final private[execution] class SchemaJsonInternCache {
+ import SchemaJsonInternCache._
+
+ private val encodeCache: Cache[StructType, Array[Byte]] =
+ Caffeine.newBuilder.maximumSize(CAP).build[StructType, Array[Byte]]()
+
+ private val decodeCache: Cache[String, StructType] =
+ Caffeine.newBuilder.maximumSize(CAP).build[String, StructType]()
+
+ /** Returns the canonical UTF-8 JSON byte form of `schema`. */
+ def encodeBytes(schema: StructType): Array[Byte] =
+ encodeCache.get(schema, k => k.json.getBytes(StandardCharsets.UTF_8))
+
+ /** Returns the canonical [[StructType]] parsed from `bytes` (UTF-8 JSON). */
+ def decodeStructType(bytes: Array[Byte]): StructType = {
+ val key = new String(bytes, StandardCharsets.UTF_8)
+ decodeCache.get(key, k => DataType.fromJson(k).asInstanceOf[StructType])
+ }
+}
+
+private[execution] object SchemaJsonInternCache {
+ // 256 entries: <= ~8.5 MB retained even at 1000-field schemas (~33 KB JSON
each). Verified by
+ // Section C working-set sweep of the FU-D7 bench harness; revisit if C1/C2
gates fail.
Review Comment:
Adopted in 8470627513 as part of the same CAP comment rewrite — dropped the
specific 8.5 MB figure (which indeed underestimates retained heap because it
only accounted for the encode side). The new wording avoids any specific MB
bound.
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.types.{DataType, StructType}
+
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
+
+import java.nio.charset.StandardCharsets
+
+/**
+ * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch
hot path. Best-effort
+ * Caffeine LRU; eviction recomputes via the same pure codec, so misses are
indistinguishable from
+ * the no-cache baseline. Thread-safety via Caffeine `get(key,
mappingFunction)`.
+ */
+final private[execution] class SchemaJsonInternCache {
+ import SchemaJsonInternCache._
+
+ private val encodeCache: Cache[StructType, Array[Byte]] =
+ Caffeine.newBuilder.maximumSize(CAP).build[StructType, Array[Byte]]()
+
+ private val decodeCache: Cache[String, StructType] =
+ Caffeine.newBuilder.maximumSize(CAP).build[String, StructType]()
+
+ /** Returns the canonical UTF-8 JSON byte form of `schema`. */
+ def encodeBytes(schema: StructType): Array[Byte] =
+ encodeCache.get(schema, k => k.json.getBytes(StandardCharsets.UTF_8))
+
+ /** Returns the canonical [[StructType]] parsed from `bytes` (UTF-8 JSON). */
+ def decodeStructType(bytes: Array[Byte]): StructType = {
+ val key = new String(bytes, StandardCharsets.UTF_8)
+ decodeCache.get(key, k => DataType.fromJson(k).asInstanceOf[StructType])
+ }
+}
+
+private[execution] object SchemaJsonInternCache {
+ // 256 entries: <= ~8.5 MB retained even at 1000-field schemas (~33 KB JSON
each). Verified by
+ // Section C working-set sweep of the FU-D7 bench harness; revisit if C1/C2
gates fail.
+ private val CAP = 256L
Review Comment:
Adopted in 8470627513 — rewrote the CAP comment to drop the undefined
private codename and the unverified MB bound. New wording grounds the choice in
empirical fanout for typical multi-cached-table queries and references the
bench harness by file name (`ColumnarTableCachePartitionStatsBenchmark`) so a
future maintainer can re-derive.
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/SchemaJsonInternCache.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.types.{DataType, StructType}
+
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
+
+import java.nio.charset.StandardCharsets
+
+/**
+ * Process-local memoizer for `StructType <-> JSON` codec on the cached-batch
hot path. Best-effort
+ * Caffeine LRU; eviction recomputes via the same pure codec, so misses are
indistinguishable from
+ * the no-cache baseline. Thread-safety via Caffeine `get(key,
mappingFunction)`.
+ */
+final private[execution] class SchemaJsonInternCache {
+ import SchemaJsonInternCache._
+
+ private val encodeCache: Cache[StructType, Array[Byte]] =
+ Caffeine.newBuilder.maximumSize(CAP).build[StructType, Array[Byte]]()
+
+ private val decodeCache: Cache[String, StructType] =
+ Caffeine.newBuilder.maximumSize(CAP).build[String, StructType]()
+
+ /** Returns the canonical UTF-8 JSON byte form of `schema`. */
+ def encodeBytes(schema: StructType): Array[Byte] =
+ encodeCache.get(schema, k => k.json.getBytes(StandardCharsets.UTF_8))
+
+ /** Returns the canonical [[StructType]] parsed from `bytes` (UTF-8 JSON). */
+ def decodeStructType(bytes: Array[Byte]): StructType = {
+ val key = new String(bytes, StandardCharsets.UTF_8)
+ decodeCache.get(key, k => DataType.fromJson(k).asInstanceOf[StructType])
+ }
+}
+
+private[execution] object SchemaJsonInternCache {
+ // 256 entries: <= ~8.5 MB retained even at 1000-field schemas (~33 KB JSON
each). Verified by
+ // Section C working-set sweep of the FU-D7 bench harness; revisit if C1/C2
gates fail.
+ private val CAP = 256L
Review Comment:
Already auto-adopted by Copilot Autofix in 8ef8fd4153 — `CAP` is now
`private[execution]`, which makes it package-visible to the benchmark and
tests. (Note: the original Autofix commit b9d0a14791 had a stray closing brace
that I fixed in place via `git rebase -i ... edit` while preserving the
Co-authored-by trailer.)
##########
benchmarks/ColumnarTableCachePartitionStatsBenchmark-results.txt:
##########
@@ -1,28 +1,162 @@
-OpenJDK 64-Bit Server VM 17.0.18+8-Ubuntu-124.04.1 on Linux
6.6.87.2-microsoft-standard-WSL2
+OpenJDK 64-Bit Server VM 17.0.18+8 on Linux 6.6.87.2-microsoft-standard-WSL2
AMD EPYC 7763 64-Core Processor
table cache build: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-partitionStats off 126425 138565
10546 0.8 1264.3 1.0X
-partitionStats on 131431 137094
7581 0.8 1314.3 1.0X
+partitionStats off 127107 127856
668 0.8 1271.1 1.0X
+partitionStats on 134398 146067
10193 0.7 1344.0 0.9X
Review Comment:
This appears to be a false positive — the file itself does not have `<N>|`
prefixes (xxd confirms byte 0 = 'O' from 'OpenJDK 64-Bit Server VM'). The
leading numbers you see may be the GitHub diff view's line-number gutter being
misread as file content. No change needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]