yaooqinn commented on code in PR #12092:
URL: https://github.com/apache/gluten/pull/12092#discussion_r3247613389


##########
gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchSerializerJniWrapper.java:
##########
@@ -48,4 +54,27 @@ public long rtHandle() {
   public native long deserializeDirect(long serializerHandle, long offset, int 
len);
 
   public native void close(long serializerHandle);
+
+  // Capability check for the serializeWithStats JNI extension. A new Gluten 
jar paired with an
+  // older libgluten.so may lack this symbol; the cache write path consults 
this helper and
+  // falls back to the legacy serialize() when false. Probing is lazy and 
one-shot: reflection
+  // on the declared native method proves the JVM side is wired (the cpp 
symbol is verified at
+  // first real invocation; callers must catch UnsatisfiedLinkError there too).
+  private static volatile Boolean supportsStatsExtCached = null;
+
+  public static boolean supportsStatsExt() {
+    Boolean cached = supportsStatsExtCached;
+    if (cached != null) {
+      return cached;
+    }
+    boolean result;
+    try {
+      
ColumnarBatchSerializerJniWrapper.class.getDeclaredMethod("serializeWithStats", 
long.class);
+      result = true;
+    } catch (NoSuchMethodException e) {
+      result = false;
+    }
+    supportsStatsExtCached = result;
+    return result;

Review Comment:
   Addressed in `9145670865` (callsite-fallback). Thank you for the advice!



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
 
           override def next(): CachedBatch = {
             val batch = veloxBatches.next()
-            val unsafeBuffer = ColumnarBatchSerializerJniWrapper
-              .create(
-                Runtimes.contextInstance(
-                  BackendsApiManager.getBackendName,
-                  "ColumnarCachedBatchSerializer#serialize"))
-              
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, 
batch))
-            val bytes = unsafeBuffer.toByteArray
-            CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+            val jni = ColumnarBatchSerializerJniWrapper.create(
+              Runtimes.contextInstance(
+                BackendsApiManager.getBackendName,
+                "ColumnarCachedBatchSerializer#serialize"))
+            val handle =
+              
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+            // Route through serializeWithStats when the JNI extension is 
available AND the
+            // partition-stats conf is enabled. Capability is cached after 
first probe. When
+            // unavailable (older libgluten.so without the symbol, or conf 
left off) we fall back
+            // to the original serialize() path and emit stats=null; the 
buildFilter wrapper
+            // directs such batches through without pruning.
+            val partitionStatsEnabled =
+              
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+            if (partitionStatsEnabled && 
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {
+              val framed = jni.serializeWithStats(handle)
+              // Carry the per-batch StructType so the Kryo (spill / disk 
cache) read path can
+              // dispatch by dataType.
+              val structSchema = StructType(
+                schema.map(
+                  a =>
+                    StructField(a.name, a.dataType, a.nullable)))
+              val (stats, bytesBlob) =
+                CachedColumnarBatchKryoSerializer.parseFramedBytes(framed, 
structSchema)
+              CachedColumnarBatch(batch.numRows(), bytesBlob.length, 
bytesBlob, stats, structSchema)
+            } else {
+              val unsafeBuffer = jni.serialize(handle)
+              val bytes = unsafeBuffer.toByteArray
+              CachedColumnarBatch(
+                batch.numRows(),
+                bytes.length,
+                bytes,
+                stats = null,
+                schema = null)
+            }

Review Comment:
   Addressed in `9145670865` (callsite-fallback). Thank you for the advice!



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -81,43 +133,443 @@ class CachedColumnarBatchKryoSerializer extends 
KryoSerializer[CachedColumnarBat
       length != Kryo.NULL,
       "The object 'CachedColumnarBatch.bytes' is invalid or malformed to " +
         s"deserialize using ${this.getClass.getName}")
-    val bytes = new Array[Byte](length - 1) // -1 to restore
+    val bytes = new Array[Byte](length - 1)
     input.readBytes(bytes)
-    CachedColumnarBatch(numRows, sizeInBytes, bytes)
+    val hasStats = input.readBoolean()
+    // Even when hasStats=false we still consume the hasSchema tag to keep the 
stream aligned.
+    // NB: avoid `val (a: T, b: U) = ...` -- Scala 2.13 erases Tuple2 generics 
and the typed
+    // pattern match throws MatchError at runtime.
+    val statsAndSchema: (InternalRow, StructType) = if (hasStats) {
+      val statsLen = input.readInt()
+      val statsBytes = new Array[Byte](statsLen)
+      input.readBytes(statsBytes)
+      val sch = readOptionalSchema(input)
+      (CachedColumnarBatchKryoSerializer.deserializeStats(statsBytes, sch), 
sch)
+    } else {
+      (null, readOptionalSchema(input))
+    }
+    CachedColumnarBatch(numRows, sizeInBytes, bytes, statsAndSchema._1, 
statsAndSchema._2)
+  }
+
+  private def readOptionalSchema(input: Input): StructType = {
+    if (!input.readBoolean()) {
+      null
+    } else {
+      val schemaLen = input.readInt()
+      val schemaBytes = new Array[Byte](schemaLen)
+      input.readBytes(schemaBytes)
+      DataType.fromJson(new String(schemaBytes, 
UTF_8)).asInstanceOf[StructType]
+    }

Review Comment:
   Addressed in `491070bf34` (kryo-bounds-and-v1-back-compat). Thank you for 
the advice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to