This is an automated email from the ASF dual-hosted git repository.
yaooqinn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7bfd45ca98 [VL] Stop using Input.available() to probe trailing markers
in CachedColumnarBatch (#12147)
7bfd45ca98 is described below
commit 7bfd45ca983eeedafc59fddb01e78ea2905cf816
Author: Kent Yao <[email protected]>
AuthorDate: Thu May 28 11:11:07 2026 +0800
[VL] Stop using Input.available() to probe trailing markers in
CachedColumnarBatch (#12147)
ColumnarCachedBatchSerializer.read guarded the trailing hasStats /
hasSchema booleans with `input.available() > 0` to tolerate the V1
wire format that predates those markers. The intent was correct --
the existing ColumnarCachedBatchKryoSuite#"V1 wire ..." test locks
absent-trailing as silent null, and that contract must be preserved
-- but `Input.available()` is the wrong probe.
`Kryo Input.available()` returns
`(limit - position) + underlyingStream.available()`, and the JDK
`InputStream.available()` contract permits any implementation to
return 0 even when more data follows -- BufferedInputStream over
shuffle-spill / network chunk boundaries routinely does so. When the
Kryo buffer is drained AND the underlying stream reports 0 at the
trailing-boolean byte position, the probe falsely concludes EOF,
skips hasStats, and the next `readClassAndObject` interprets the
stats payload (which contains the schema JSON) as a class name --
surfacing as `ClassNotFoundException: {"type":"struct",...}` with
the stack topped by `DefaultClassResolver.readName`.
Replace the probe with a try/readBoolean/catch on the narrow Kryo
"Buffer underflow" surface. This catches the real EOF when the V1
wire has no trailing booleans (preserves the silent-null contract)
without ever consulting `available()`, so a V2 wire under
chunked-fill always reads the trailing markers correctly.
The catch is intentionally narrow (message-prefix match on
"Buffer underflow") so that genuine corruption -- including
ClassNotFoundException wrapped during readClassAndObject -- is
never swallowed.
The length-bound `require(... <= maxLen ...)` guard from commit
491070bf34 (defending against NegativeArraySizeException /
oversized allocation) is preserved -- that part is orthogonal to
the V1 probe and remains useful.
A new test ColumnarCachedBatchKryoBoundaryProbeBugSuite locks the
chunked-fill probe contract: a 1-byte-per-read InputStream that
returns `available() == 0` must still round-trip multi-batch V2
wire correctly. The existing V1-wire silent-null test in
ColumnarCachedBatchKryoSuite continues to pass unchanged.
---
.../execution/ColumnarCachedBatchSerializer.scala | 50 ++++++++--
...umnarCachedBatchKryoBoundaryProbeBugSuite.scala | 111 +++++++++++++++++++++
2 files changed, 150 insertions(+), 11 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index 7bf7a17546..bb548b2f9c 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
-import com.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
+import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer =>
KryoSerializer}
import com.esotericsoftware.kryo.DefaultSerializer
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.arrow.c.ArrowSchema
@@ -152,14 +152,30 @@ class CachedColumnarBatchKryoSerializer extends
KryoSerializer[CachedColumnarBat
)
val bytes = new Array[Byte](payloadLen)
input.readBytes(bytes)
- // Backward-compat with the V1 wire format (no trailing hasStats /
hasSchema booleans):
- // legacy CachedColumnarBatch instances persisted on disk (DISK_ONLY /
MEMORY_AND_DISK)
- // surviving a rolling upgrade lack these fields. available() is
best-effort -- treats
- // unavailable suffix as "absent" instead of throwing KryoException.
- val hasStats = input.available() > 0 && input.readBoolean()
- // Even when hasStats=false we still consume the hasSchema tag to keep the
stream aligned.
- // NB: avoid `val (a: T, b: U) = ...` -- Scala 2.13 erases Tuple2 generics
and the typed
- // pattern match throws MatchError at runtime.
+ // Read the trailing hasStats marker. Catching a Buffer-underflow
KryoException
+ // here preserves backward compatibility with the V1 wire format (no
trailing
+ // hasStats / hasSchema booleans), which the existing
+ // ColumnarCachedBatchKryoSuite#"V1 wire ..." test locks as a contract:
+ // an absent trailing byte must read as null, not throw.
+ //
+ // Why a try/catch instead of `input.available() > 0 && readBoolean`:
+ // Kryo `Input.available()` returns `(limit - position) +
underlyingStream.available()`,
+ // and the JDK `InputStream.available()` contract permits any
implementation to
+ // return 0 even when more data follows -- BufferedInputStream over
shuffle-spill
+ // / network chunk boundaries routinely does so. When the Kryo buffer is
drained
+ // AND the underlying stream reports 0 at the trailing-boolean byte
position, the
+ // probe falsely concludes EOF, skips hasStats, and the next
readClassAndObject
+ // interprets the stats payload (which contains the schema JSON) as a
class name --
+ // surfacing as `ClassNotFoundException: {"type":"struct",...}` with the
stack
+ // topped by `DefaultClassResolver.readName`. A try/catch on the real EOF
surface
+ // (Kryo "Buffer underflow") avoids the false-EOF probe while still
tolerating
+ // V1 wire.
+ //
+ // NB: avoid `val (a: T, b: U) = ...` -- Scala 2.13 erases Tuple2 generics
and the
+ // typed pattern match throws MatchError at runtime.
+ val hasStats =
+ try input.readBoolean()
+ catch { case e: KryoException if isBufferUnderflow(e) => false }
val statsAndSchema: (InternalRow, StructType) = if (hasStats) {
val statsLen = input.readInt()
require(
@@ -177,9 +193,21 @@ class CachedColumnarBatchKryoSerializer extends
KryoSerializer[CachedColumnarBat
CachedColumnarBatch(numRows, sizeInBytes, bytes, statsAndSchema._1,
statsAndSchema._2)
}
+ // Kryo signals end-of-input by throwing KryoException with a message
starting
+ // with "Buffer underflow". There is no dedicated subclass, so a
message-prefix
+ // check is the narrowest filter we can apply without swallowing real
corruption
+ // (e.g. ClassNotFoundException wrapped during readClassAndObject).
+ private def isBufferUnderflow(e: KryoException): Boolean = {
+ val msg = e.getMessage
+ msg != null && msg.startsWith("Buffer underflow")
+ }
+
private def readOptionalSchema(input: Input, maxLen: Long): StructType = {
- // Treat absent trailing bytes as "no schema": V1 wire format predates
this field.
- if (input.available() <= 0 || !input.readBoolean()) {
+ // Trailing schema marker. See readSchema above for the same
V1-vs-chunked-fill rationale.
+ val hasSchema =
+ try input.readBoolean()
+ catch { case e: KryoException if isBufferUnderflow(e) => false }
+ if (!hasSchema) {
null
} else {
val schemaLen = input.readInt()
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchKryoBoundaryProbeBugSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchKryoBoundaryProbeBugSuite.scala
new file mode 100644
index 0000000000..15bbcb3f84
--- /dev/null
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchKryoBoundaryProbeBugSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import org.scalatest.funsuite.AnyFunSuite
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
+
+/**
+ * Deterministic repro for the L154/L180 Input.available() boolean-probe bug.
+ *
+ * Trigger conditions (all required):
+ * (1) Multi-batch deserialize via kryo.readClassAndObject from one stream.
+ * (2) Kryo Input wraps an InputStream (not byte[]).
+ * (3) At a batch's trailing hasStats/hasSchema position, the underlying
+ * InputStream returns available()=0 AND the Kryo Input buffer is drained
+ * (limit==position). Both conditions must hit the SAME byte position.
+ *
+ * Real prod path observed in production:
+ * BufferedInputStream over shuffle-spill / network ManagedBuffer chunk
+ * boundary -> stream.available()=0 between chunks, Kryo Input.available()
+ * = (limit-pos) + 0 -> reads 0 when buffer drained.
+ *
+ * Fixture: 1-byte-per-read stream + lying available()=0 -> every byte boundary
+ * satisfies (3); any trailing-boolean byte aligned with a Kryo refill triggers
+ * the false-EOF.
+ */
+class ColumnarCachedBatchKryoBoundaryProbeBugSuite extends AnyFunSuite {
+
+ final private class LyingOneByteStream(src: InputStream) extends InputStream
{
+ override def read(): Int = src.read()
+ override def read(b: Array[Byte], off: Int, len: Int): Int = {
+ if (len == 0) 0
+ else {
+ val c = src.read()
+ if (c == -1) -1
+ else {
+ b(off) = c.toByte
+ 1
+ }
+ }
+ }
+ override def available(): Int = 0
+ }
+
+ private def mkBatch(i: Int): CachedColumnarBatch = {
+ // PartitionStatistics per-column slots:
+ // [lower(typed) upper(typed) count(Int) nullCount(Int) sizeBytes(Long)]
+ val stats: InternalRow =
+ new GenericInternalRow(Array[Any](i.toLong, (i * 10).toLong, i, 0, 8L))
+ val schema = StructType(Seq(StructField(s"col$i", LongType, nullable =
true)))
+ val bytes = Array.fill[Byte](128)(i.toByte)
+ CachedColumnarBatch(
+ numRows = i,
+ sizeInBytes = bytes.length.toLong,
+ bytes = bytes,
+ stats = stats,
+ schema = schema)
+ }
+
+ test("multi-batch deserialize survives boundary-aligned trailing-boolean
probe") {
+ val kryo = new Kryo()
+ val ser = new CachedColumnarBatchKryoSerializer()
+ kryo.register(classOf[CachedColumnarBatch], ser)
+
+ val baos = new ByteArrayOutputStream()
+ val out = new Output(baos)
+ val originals = (1 to 10).map(mkBatch)
+ originals.foreach(b => kryo.writeClassAndObject(out, b))
+ out.close()
+
+ val raw = baos.toByteArray
+ val in = new Input(new LyingOneByteStream(new ByteArrayInputStream(raw)),
32)
+
+ val read = (1 to 10).map(_ =>
kryo.readClassAndObject(in).asInstanceOf[CachedColumnarBatch])
+ in.close()
+
+ originals.zip(read).zipWithIndex.foreach {
+ case ((o, r), i) =>
+ info(s"batch $i: orig.stats=${o.stats != null} schema=${o.schema}")
+ info(s"batch $i: read.stats=${r.stats != null} schema=${r.schema}")
+ assert(r.numRows == o.numRows, s"batch $i numRows mismatch")
+ assert(r.bytes.toSeq == o.bytes.toSeq, s"batch $i bytes mismatch")
+ assert(r.stats != null, s"batch $i stats lost (BUG)")
+ assert(r.schema == o.schema, s"batch $i schema mismatch (BUG)")
+ }
+ }
+
+ // V1 wire backward-compat is locked by ColumnarCachedBatchKryoSuite#"V1
wire ..."
+ // -- not duplicated here. This suite only covers the chunked-fill probe
path.
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]