zhouyuan commented on code in PR #12211:
URL: https://github.com/apache/gluten/pull/12211#discussion_r3480944701
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -1041,4 +1260,86 @@ object ColumnarCachedBatchSerializer extends Logging {
)
}
}
+
+ // Visible for testing: reset the capability flag so a unit test can
re-exercise the
+ // probe-once semantics.
+ private[execution] def resetStatsExtAvailableForTesting(): Unit = {
+ statsExtAvailableFlag = true
+ }
+
+ // V3 lazy deserialization support
+
+ // Separate capability latch for the V3 JNI symbols
+ // (framedSerializeV3 / framedSerializeWithStatsV3).
+ @volatile private var statsExtV3AvailableFlag: Boolean = true
+
+ def statsExtV3Available: Boolean = statsExtV3AvailableFlag
+
+ // Benchmark-only hook used by ColumnarTableCacheLazyDeserBenchmark to
materialize the old
+ // eager/raw cache bytes as a baseline. This is intentionally
package-private and not a user
+ // configuration: V3 lazy deserialization remains the production default.
+ private[execution] def withStatsExtV3AvailabilityForBenchmark[T](available:
Boolean)(
+ f: => T): T = synchronized {
+ val previous = statsExtV3AvailableFlag
+ statsExtV3AvailableFlag = available
+ try {
+ f
+ } finally {
+ statsExtV3AvailableFlag = previous
+ }
+ }
+
+ def markStatsExtV3Unavailable(cause: Throwable): Unit = {
+ if (statsExtV3AvailableFlag) {
+ statsExtV3AvailableFlag = false
+ logWarning(
+ "V3 table cache serialization JNI path is unavailable; " +
+ "disabling V3 per-column lazy deserialization for this JVM. " +
+ "This typically indicates a Gluten jar / native library version
mismatch.",
+ cause
+ )
+ }
+ }
+
+ // V3 per-batch serialization: identical two-arm catch structure to
serializeOneBatchWithStats.
+ // null return from JNI = non-Velox backend; treated as one-shot latch, not
corrupt frame.
+ private[execution] def serializeOneBatchV3(
+ jni: ColumnarBatchSerializerJniWrapper,
+ handle: Long,
+ numRows: Int,
+ structSchema: StructType,
+ includeStats: Boolean,
+ fallbackToV2OrLegacy: () => CachedBatch): CachedBatch = {
+ try {
+ val framed =
+ if (includeStats) jni.serializeWithStatsV3(handle)
+ else jni.serializeV3(handle)
+ if (framed == null) {
+ // Non-Velox backend returns null; set latch and fall back.
+ markStatsExtV3Unavailable(
+ new RuntimeException("framedSerializeV3 returned null (backend not
supported)"))
+ return fallbackToV2OrLegacy()
+ }
+ CachedColumnarBatchKryoSerializer.requireV3FrameNumRows(
+ framed,
+ numRows,
+ "serialize V3 cached batch",
+ expectedNumCols = structSchema.length)
+ val (stats, _) =
CachedColumnarBatchKryoSerializer.parseFramedBytes(framed, structSchema)
Review Comment:
both the `requireV3FrameNumRows` and `parseFrameBytes` will call
`parseV3FrameInternal`, it's better to remove the redundant call
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]