yaooqinn commented on code in PR #12092:
URL: https://github.com/apache/gluten/pull/12092#discussion_r3247815459
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with Logging {
override def next(): CachedBatch = {
val batch = veloxBatches.next()
- val unsafeBuffer = ColumnarBatchSerializerJniWrapper
- .create(
- Runtimes.contextInstance(
- BackendsApiManager.getBackendName,
- "ColumnarCachedBatchSerializer#serialize"))
-
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
batch))
- val bytes = unsafeBuffer.toByteArray
- CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+ val jni = ColumnarBatchSerializerJniWrapper.create(
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName,
+ "ColumnarCachedBatchSerializer#serialize"))
+ val handle =
+
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+ // Route through serializeWithStats when the JNI extension is
available AND the
+ // partition-stats conf is enabled. Capability is cached after
first probe. When
+ // unavailable (older libgluten.so without the symbol, or conf
left off) we fall back
+ // to the original serialize() path and emit stats=null; the
buildFilter wrapper
+ // directs such batches through without pruning.
+ val partitionStatsEnabled =
+
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+ if (partitionStatsEnabled &&
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {
Review Comment:
Addressed in 3f9c923644. Added a new E2E test `partitionStats.enabled=false:
legacy serialize() path correctness preserved` to `ColumnarCachedBatchE2ESuite`
that wraps `cacheRange()` in a `withSQLConf` block setting the production
default `false`, materializes the cache, and verifies the equality filter
result. This exercises the disabled branch of the gate end-to-end so any
regression that silently activates stats or breaks the stats=null read path
surfaces here. Suite passes 12/12 locally (11 pre-existing + 1 new). Thanks for
catching the gap.
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with Logging {
override def next(): CachedBatch = {
val batch = veloxBatches.next()
- val unsafeBuffer = ColumnarBatchSerializerJniWrapper
- .create(
- Runtimes.contextInstance(
- BackendsApiManager.getBackendName,
- "ColumnarCachedBatchSerializer#serialize"))
-
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
batch))
- val bytes = unsafeBuffer.toByteArray
- CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+ val jni = ColumnarBatchSerializerJniWrapper.create(
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName,
+ "ColumnarCachedBatchSerializer#serialize"))
+ val handle =
+
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+ // Route through serializeWithStats when the JNI extension is
available AND the
Review Comment:
Thanks -- this is intentional and won't be addressed in this PR. The config
is scoped to the write path by design (see design D-A4 trade-off in the PR
description): stats already embedded in cached batches should be honored,
otherwise toggling the config mid-session would leave behind un-prunable cached
data with no way to drop the stats blob. The existing is the user-facing way
to drop cached stats. Behavior is correct (using available stats is never
wrong), and no current user could rely on the read-path being gated since the
feature is new. Adding a read-path kill switch is tracked as a future
possibility (covered by the same on/off SQLConf if demand emerges).
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with Logging {
override def next(): CachedBatch = {
val batch = veloxBatches.next()
- val unsafeBuffer = ColumnarBatchSerializerJniWrapper
- .create(
- Runtimes.contextInstance(
- BackendsApiManager.getBackendName,
- "ColumnarCachedBatchSerializer#serialize"))
-
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
batch))
- val bytes = unsafeBuffer.toByteArray
- CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+ val jni = ColumnarBatchSerializerJniWrapper.create(
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName,
+ "ColumnarCachedBatchSerializer#serialize"))
+ val handle =
+
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+ // Route through serializeWithStats when the JNI extension is
available AND the
+ // partition-stats conf is enabled. Capability is cached after
first probe. When
+ // unavailable (older libgluten.so without the symbol, or conf
left off) we fall back
+ // to the original serialize() path and emit stats=null; the
buildFilter wrapper
+ // directs such batches through without pruning.
+ val partitionStatsEnabled =
+
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+ if (partitionStatsEnabled &&
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {
+ val framed = jni.serializeWithStats(handle)
+ // Carry the per-batch StructType so the Kryo (spill / disk
cache) read path can
+ // dispatch by dataType.
+ val structSchema = StructType(
+ schema.map(
+ a =>
+ StructField(a.name, a.dataType, a.nullable)))
+ val (stats, bytesBlob) =
+ CachedColumnarBatchKryoSerializer.parseFramedBytes(framed,
structSchema)
+ CachedColumnarBatch(batch.numRows(), bytesBlob.length,
bytesBlob, stats, structSchema)
Review Comment:
Addressed in 3f9c923644. Hoisted the
`StructType(schema.map(StructField(...)))` allocation out of the per-batch
`Iterator.next()` body to a single iterator-scoped val. Schema is constant for
the lifetime of a partition iterator so re-building per batch was wasted GC,
especially for the many-small-batch case as you noted. Thank you!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]