This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f848a40f9a3a6f7d0246d4e77fe585379405d02d Author: voonhous <[email protected]> AuthorDate: Wed May 20 15:46:33 2026 +0800 feat(blob): default blob.inline.mode to DESCRIPTOR for Lance (#18744) Co-authored-by: Rahil Chertara <[email protected]> Co-authored-by: Y Ethan Guo <[email protected]> --- .../hudi/common/config/HoodieReaderConfig.java | 10 +- .../spark/sql/hudi/blob/BatchedBlobReader.scala | 35 +- .../spark/sql/hudi/blob/ScalarFunctions.scala | 4 + .../hudi/functional/TestLanceDataSource.scala | 479 ++++++++++++++++++++- rfc/rfc-100/rfc-100.md | 103 +++-- 5 files changed, 561 insertions(+), 70 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java index 942d1aeabb50..9cbab8f4468c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java @@ -107,13 +107,13 @@ public class HoodieReaderConfig extends HoodieConfig { public static final String BLOB_INLINE_READ_MODE_DESCRIPTOR = "DESCRIPTOR"; public static final ConfigProperty<String> BLOB_INLINE_READ_MODE = ConfigProperty .key("hoodie.read.blob.inline.mode") - .defaultValue(BLOB_INLINE_READ_MODE_CONTENT) + .defaultValue(BLOB_INLINE_READ_MODE_DESCRIPTOR) .markAdvanced() .sinceVersion("1.2.0") .withValidValues(BLOB_INLINE_READ_MODE_CONTENT, BLOB_INLINE_READ_MODE_DESCRIPTOR) .withDocumentation("How Hudi interprets INLINE BLOB values on read. " - + "CONTENT (default) returns the raw inline bytes. " - + "DESCRIPTOR returns an OUT_OF_LINE-shaped reference pointing at the backing " - + "Lance file with the INLINE payload's position and size, so callers can defer " - + "the byte read via read_blob()."); + + "DESCRIPTOR (default) returns an OUT_OF_LINE-shaped reference pointing at the " + + "backing Lance file with the INLINE payload's position and size, so callers can " + + "skip the byte content read. " + + "CONTENT returns the raw inline bytes directly in the data field on every read."); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala index a1c299cf2611..419fb2c8a8d5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala @@ -208,16 +208,31 @@ class BatchedBlobReader( // Dispatch based on storage_type (field 0) val storageType = accessor.getString(blobStruct, 0) if (storageType == HoodieSchema.Blob.INLINE) { - // Case 1: Inline — bytes are in field 1 - val bytes = accessor.getBytes(blobStruct, 1) - batch += RowInfo[R]( - originalRow = row, - filePath = "", - offset = -1, - length = -1, - index = rowIndex, - inlineBytes = Some(bytes) - ) + // INLINE + CONTENT: inline_data is populated; return bytes directly (1-hop). + // INLINE + DESCRIPTOR: inline_data is null and the scan synthesized a + // reference pointing into the backing file's storage layout. We refuse to + // materialize bytes here — DESCRIPTOR is a metadata-only mode for INLINE + // rows, and the synthesized reference is an internal pointer, not + // user-facing storage info. Callers must switch to CONTENT mode or stop + // using read_blob() on INLINE columns under DESCRIPTOR. + if (!accessor.isNullAt(blobStruct, 1)) { + val bytes = accessor.getBytes(blobStruct, 1) + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(bytes) + ) + } else { + throw new IllegalStateException( + s"read_blob() cannot materialize bytes for an INLINE blob under " + + s"DESCRIPTOR mode. Under hoodie.read.blob.inline.mode=DESCRIPTOR, " + + s"INLINE blobs are returned as metadata-only (inline_data=NULL, " + + s"synthesized reference). To read bytes, set " + + s"hoodie.read.blob.inline.mode=CONTENT") + } } else if (storageType == HoodieSchema.Blob.OUT_OF_LINE) { // Case 2 or 3: Out-of-line — get reference struct (field 2) require(!accessor.isNullAt(blobStruct, 2), s"Out-of-line blob at row $rowIndex must set reference") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala index bf94b4519c33..56e7b3bac096 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala @@ -81,6 +81,10 @@ object ScalarFunctions { |Returns: | Binary data read from the file | + |Caveat: + | Throws on INLINE rows under hoodie.read.blob.inline.mode=DESCRIPTOR. + | Set CONTENT mode to materialize INLINE bytes. + | |Performance: | - Configure batching: hoodie.blob.batching.max.gap.bytes (default 4096) | - Configure lookahead: hoodie.blob.batching.lookahead.size (default 50) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 093b6ee30873..abce02568633 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -37,9 +37,10 @@ import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.sql.types._ -import org.junit.jupiter.api.{AfterEach, BeforeEach} -import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertNotNull, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertNotNull, assertThrows, assertTrue} import org.junit.jupiter.api.condition.DisabledIfSystemProperty +import org.junit.jupiter.api.function.Executable import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource} import org.lance.file.LanceFileReader @@ -869,8 +870,11 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { // Writer-side: prove the bytes actually routed through Lance's dedicated blob writer. assertLanceBlobEncoding(tablePath) - // Reader-side: in CONTENT mode the INLINE bytes come back directly in `data`. - val readRows = spark.read.format("hudi").load(tablePath) + // Reader-side: in CONTENT mode the INLINE bytes come back directly in `data`. Set the mode + // explicitly — the default is DESCRIPTOR, which would surface a reference instead. + val readRows = spark.read.format("hudi") + .option("hoodie.read.blob.inline.mode", "CONTENT") + .load(tablePath) .select($"id", $"payload") .orderBy($"id") .collect() @@ -894,9 +898,13 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { } } - // read_blob() resolution path: INLINE payloads resolve to the same bytes. + // read_blob() resolution path: INLINE payloads resolve to the same bytes. CONTENT is set + // explicitly here — under the DESCRIPTOR default, read_blob() throws for INLINE rows. val viewName = s"${tableName}_view" - spark.read.format("hudi").load(tablePath).createOrReplaceTempView(viewName) + spark.read.format("hudi") + .option("hoodie.read.blob.inline.mode", "CONTENT") + .load(tablePath) + .createOrReplaceTempView(viewName) val materialized = spark.sql( s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY id").collect() assertEquals(numRows, materialized.length) @@ -917,8 +925,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { * DESCRIPTOR mode on INLINE rows: user writes `data` bytes; on read with * `hoodie.read.blob.inline.mode=DESCRIPTOR` each row comes back with type still set to * {@code INLINE} (preserving the original storage mode) but with {@code data=null} and a - * populated {@code reference} pointing at the Lance file. {@code read_blob()} then preads - * the bytes back from the .lance file via the reference. + * populated synthesized {@code reference} pointing at the Lance file. The synthesized + * reference is an internal pointer, not user-facing storage. {@code read_blob()} is + * therefore unsupported on INLINE rows in this mode and must throw a clear error so + * callers don't conflate the synthesized pointer with durable metadata. */ @ParameterizedTest @EnumSource(value = classOf[HoodieTableType]) @@ -978,17 +988,458 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { s"synthetic reference to the Lance file should be flagged managed (id=$i)") } - // read_blob() materializes bytes via BatchedBlobReader, which always reads with CONTENT - // mode (actual bytes) regardless of the user's inline read mode setting. + // read_blob() on INLINE rows under DESCRIPTOR mode is unsupported by design: DESCRIPTOR + // is metadata-only and the synthesized reference is an internal pointer into the .lance + // file's storage layout, not user-facing metadata. BatchedBlobReader must throw with a + // message that names both INLINE and DESCRIPTOR so the failure is actionable. val viewName = s"${tableName}_view" - spark.read.format("hudi").load(tablePath).createOrReplaceTempView(viewName) - val materialized = spark.sql( + spark.read.format("hudi") + .option(modeKey, "DESCRIPTOR") + .load(tablePath) + .createOrReplaceTempView(viewName) + val ex = assertThrows(classOf[Throwable], new Executable { + override def execute(): Unit = { + spark.sql(s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY id").collect() + } + }) + val msgChain = Iterator.iterate[Throwable](ex)(_.getCause).takeWhile(_ != null) + .flatMap(t => Option(t.getMessage)).mkString(" | ") + assertTrue(msgChain.contains("INLINE") && msgChain.contains("DESCRIPTOR"), + s"error must mention INLINE and DESCRIPTOR; got: $msgChain") + } + + /** + * Mixed-storage table on Lance: one blob column holds both INLINE rows (small payloads + * stored inline) and OUT_OF_LINE rows (external file references). Under CONTENT mode, + * read_blob() must materialize the correct bytes for both shapes in a single query — + * INLINE rows go through the 1-hop inline_data passthrough, OUT_OF_LINE rows go through + * the external pread (with BatchedBlobReader merging consecutive ranges). + */ + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testBlobMixedInlineAndOutOfLineContentMode(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_blob_mixed_content_${tableType.name().toLowerCase}" + val tablePath = s"$basePath/$tableName" + + val payloadLen = 256 + val numInline = 3 + val numOutOfLine = 3 + val externalFileSize = numOutOfLine * payloadLen + val externalDir = Files.createDirectories( + Paths.get(s"$basePath/_blob_ext_mixed_${tableType.name().toLowerCase}")) + val extPath = BlobTestHelpers.createTestFile(externalDir, "mixed_file.bin", externalFileSize) + + val inlinePayloads: Seq[Array[Byte]] = (0 until numInline).map { i => + (0 until payloadLen).map(j => ((i + j) % 256).toByte).toArray + } + + val sparkSess = spark + import sparkSess.implicits._ + val inlineDf = inlinePayloads.zipWithIndex.map { case (b, i) => (i, b) } + .toDF("id", "bytes") + .select($"id", BlobTestHelpers.inlineBlobStructCol("payload", $"bytes")) + + val outOfLineDf = (0 until numOutOfLine).map { k => + (numInline + k, extPath, (k * payloadLen).toLong, payloadLen.toLong) + }.toDF("id", "path", "offset", "length") + .select($"id", BlobTestHelpers.blobStructCol("payload", $"path", $"offset", $"length")) + + val canonicalSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("payload", BlobType().asInstanceOf[StructType], nullable = true, + BlobTestHelpers.blobMetadata) + )) + val raw = inlineDf.unionByName(outOfLineDf) + val df = spark.createDataFrame(raw.rdd, canonicalSchema) + + writeDataframe(tableType, tableName, tablePath, df, saveMode = SaveMode.Overwrite, + operation = Some("bulk_insert"), + extraOptions = Map(PRECOMBINE_FIELD.key() -> "id")) + + assertLanceBlobEncoding(tablePath) + + val viewName = s"${tableName}_mixed_view" + spark.read.format("hudi") + .option("hoodie.read.blob.inline.mode", "CONTENT") + .load(tablePath) + .createOrReplaceTempView(viewName) + + val rows = spark.sql( s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY id").collect() + assertEquals(numInline + numOutOfLine, rows.length) + + rows.foreach { row => + val id = row.getInt(row.fieldIndex("id")) + val bytes = row.getAs[Array[Byte]]("bytes") + if (id < numInline) { + assertArrayEquals(inlinePayloads(id), bytes, + s"INLINE row: read_blob() bytes mismatch (id=$id)") + } else { + val k = id - numInline + assertEquals(payloadLen, bytes.length, + s"OUT_OF_LINE row: read_blob() length mismatch (id=$id)") + BlobTestHelpers.assertBytesContent(bytes, expectedOffset = k * payloadLen) + } + } + } + + /** + * Shared writer for multi-blob-column INLINE tests: writes a table with two INLINE blob + * columns ({@code payload_a}, {@code payload_b}) and returns the table path plus the + * payloads written for each column so individual tests can assert on read. + * + * Distinct byte patterns are used per column so a column-swap regression would surface + * immediately as a byte mismatch rather than silently passing. + */ + private def writeMultiBlobInlineTable( + tableType: HoodieTableType, + tableName: String, + numRows: Int = 4, + payloadLen: Int = 512): (String, Seq[Array[Byte]], Seq[Array[Byte]]) = { + val tablePath = s"$basePath/$tableName" + val payloadsA: Seq[Array[Byte]] = (0 until numRows).map { i => + (0 until payloadLen).map(j => ((i + j) % 256).toByte).toArray + } + val payloadsB: Seq[Array[Byte]] = (0 until numRows).map { i => + (0 until payloadLen).map(j => ((i + j + 128) % 256).toByte).toArray + } + val sparkSess = spark + import sparkSess.implicits._ + + val baseDf = (0 until numRows).map(i => (i, payloadsA(i), payloadsB(i))) + .toDF("id", "bytes_a", "bytes_b") + val rawDf = baseDf.select( + $"id", + BlobTestHelpers.inlineBlobStructCol("payload_a", $"bytes_a"), + BlobTestHelpers.inlineBlobStructCol("payload_b", $"bytes_b")) + val canonicalSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("payload_a", BlobType().asInstanceOf[StructType], nullable = true, + BlobTestHelpers.blobMetadata), + StructField("payload_b", BlobType().asInstanceOf[StructType], nullable = true, + BlobTestHelpers.blobMetadata) + )) + val df = spark.createDataFrame(rawDf.rdd, canonicalSchema) + + writeDataframe(tableType, tableName, tablePath, df, saveMode = SaveMode.Overwrite, + operation = Some("bulk_insert"), + extraOptions = Map(PRECOMBINE_FIELD.key() -> "id")) + assertLanceBlobEncoding(tablePath) + (tablePath, payloadsA, payloadsB) + } + + /** + * Plain struct projection across two INLINE blob columns: {@code SELECT payload_a, payload_b + * FROM table}. The single-column shape is already pinned by {@code testBlobInlineRoundTrip} + * (CONTENT) and {@code testBlobInlineDescriptorMode} (DESCRIPTOR); this test only asserts the + * per-column-independence properties that are unique to the multi-column case: + * + * - CONTENT: each column's {@code data} carries its own written bytes (distinct byte + * patterns per column rule out cross-column aliasing). + * - DESCRIPTOR (default): both columns independently get {@code data=null} and a populated + * synthesized {@code reference} — i.e. the synthesis fires per-column, not just on the + * first blob column. + */ + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testBlobInlineMultipleColumnsPlainSelect(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_blob_multi_plain_${tableType.name().toLowerCase}" + val payloadLen = 512 + val numRows = 4 + val (tablePath, payloadsA, payloadsB) = writeMultiBlobInlineTable( + tableType, tableName, numRows, payloadLen) + val sparkSess = spark + import sparkSess.implicits._ + val modeKey = "hoodie.read.blob.inline.mode" + + // CONTENT: per-column bytes must not alias — payload_a carries payloadsA, payload_b carries + // payloadsB. The byte patterns differ by construction (see writeMultiBlobInlineTable). + val contentRows = spark.read.format("hudi") + .option(modeKey, "CONTENT") + .load(tablePath) + .select($"id", $"payload_a", $"payload_b") + .orderBy($"id") + .collect() + assertEquals(numRows, contentRows.length) + contentRows.zipWithIndex.foreach { case (row, i) => + val a = row.getStruct(row.fieldIndex("payload_a")) + val b = row.getStruct(row.fieldIndex("payload_b")) + assertArrayEquals(payloadsA(i), a.getAs[Array[Byte]](HoodieSchema.Blob.INLINE_DATA_FIELD), + s"payload_a: bytes must match written payloadsA under CONTENT (id=$i)") + assertArrayEquals(payloadsB(i), b.getAs[Array[Byte]](HoodieSchema.Blob.INLINE_DATA_FIELD), + s"payload_b: bytes must match written payloadsB under CONTENT (id=$i)") + } + + // DESCRIPTOR (default): descriptor synthesis must fire per-column. data=null and a + // populated reference on BOTH columns is the property that distinguishes this from the + // single-column DESCRIPTOR test. + val descRows = spark.read.format("hudi") + .load(tablePath) + .select($"id", $"payload_a", $"payload_b") + .orderBy($"id") + .collect() + assertEquals(numRows, descRows.length) + descRows.foreach { row => + val id = row.getInt(row.fieldIndex("id")) + Seq("payload_a", "payload_b").foreach { col => + val payload = row.getStruct(row.fieldIndex(col)) + assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)), + s"$col: data should be null under DESCRIPTOR (id=$id)") + assertNotNull(payload.getStruct(payload.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)), + s"$col: reference should be populated under DESCRIPTOR (id=$id)") + } + } + } + + /** + * Materializing both INLINE blob columns via {@code read_blob()} in a single query under + * CONTENT mode: {@code SELECT read_blob(payload_a), read_blob(payload_b) FROM table}. Each + * column must resolve via the 1-hop {@code inline_data} passthrough with its own bytes — + * distinct byte patterns per column would surface a cross-column aliasing regression. + * + * The DESCRIPTOR-mode failure path for {@code read_blob()} on INLINE rows is pinned by + * {@code testBlobInlineDescriptorMode} (single column) and {@code + * testBlobInlineMultipleColumnsMixedSelect} (one read_blob + one struct projection); it is + * not re-asserted here. + */ + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testBlobInlineMultipleColumnsReadBlobAll(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_blob_multi_readblob_${tableType.name().toLowerCase}" + val numRows = 4 + val (tablePath, payloadsA, payloadsB) = writeMultiBlobInlineTable( + tableType, tableName, numRows) + val modeKey = "hoodie.read.blob.inline.mode" + + val contentView = s"${tableName}_content_view" + spark.read.format("hudi") + .option(modeKey, "CONTENT") + .load(tablePath) + .createOrReplaceTempView(contentView) + val materialized = spark.sql( + s"SELECT id, read_blob(payload_a) AS bytes_a, read_blob(payload_b) AS bytes_b " + + s"FROM $contentView ORDER BY id").collect() assertEquals(numRows, materialized.length) materialized.zipWithIndex.foreach { case (row, i) => + assertEquals(i, row.getInt(row.fieldIndex("id"))) + assertArrayEquals(payloadsA(i), row.getAs[Array[Byte]]("bytes_a"), + s"read_blob(payload_a) should match under CONTENT (id=$i)") + assertArrayEquals(payloadsB(i), row.getAs[Array[Byte]]("bytes_b"), + s"read_blob(payload_b) should match under CONTENT (id=$i)") + } + } + + /** + * Mixed projection across two INLINE blob columns: {@code SELECT read_blob(payload_a), + * payload_b FROM table}. One column is materialized via {@code read_blob()}, the other is + * left as a struct. This is the case explicitly raised in PR review — under the DESCRIPTOR + * default, a mixed query asking for bytes on one column and a pointer on another must fail + * loudly rather than silently returning one materialized and one synthesized shape. + * + * - CONTENT: {@code payload_a} resolves to bytes (1-hop), {@code payload_b} comes back as + * the same content-shape struct as {@code testBlobInlineMultipleColumnsPlainSelect} + * (data=bytes, reference present-but-empty). Pinning both shapes in the same row + * confirms the projection doesn't bleed across columns. + * - DESCRIPTOR (default): the {@code read_blob()} call on {@code payload_a} still hits + * the INLINE+DESCRIPTOR branch even though {@code payload_b} is only being projected as + * a struct. {@code payload_b}'s shape doesn't soften the failure. + */ + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testBlobInlineMultipleColumnsMixedSelect(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_blob_multi_mixed_${tableType.name().toLowerCase}" + val numRows = 4 + val (tablePath, payloadsA, payloadsB) = writeMultiBlobInlineTable( + tableType, tableName, numRows) + val modeKey = "hoodie.read.blob.inline.mode" + + // CONTENT: read_blob() materializes payload_a; payload_b returned as content-shape struct. + val contentView = s"${tableName}_content_view" + spark.read.format("hudi") + .option(modeKey, "CONTENT") + .load(tablePath) + .createOrReplaceTempView(contentView) + val rows = spark.sql( + s"SELECT id, read_blob(payload_a) AS bytes_a, payload_b " + + s"FROM $contentView ORDER BY id").collect() + assertEquals(numRows, rows.length) + rows.zipWithIndex.foreach { case (row, i) => + assertEquals(i, row.getInt(row.fieldIndex("id"))) + assertArrayEquals(payloadsA(i), row.getAs[Array[Byte]]("bytes_a"), + s"read_blob(payload_a) should return bytes under CONTENT (id=$i)") + val payloadB = row.getStruct(row.fieldIndex("payload_b")) + assertEquals(HoodieSchema.Blob.INLINE, + payloadB.getString(payloadB.fieldIndex(HoodieSchema.Blob.TYPE)), + s"payload_b: type should remain INLINE under CONTENT (id=$i)") + assertArrayEquals(payloadsB(i), + payloadB.getAs[Array[Byte]](HoodieSchema.Blob.INLINE_DATA_FIELD), + s"payload_b: data should match written bytes under CONTENT (id=$i)") + val refIdx = payloadB.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE) + if (!payloadB.isNullAt(refIdx)) { + val ref = payloadB.getStruct(refIdx) + assertTrue(ref.isNullAt(ref.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH)), + s"payload_b: reference.external_path should be null under CONTENT (id=$i)") + } + } + + // DESCRIPTOR (default): read_blob(payload_a) trips even though payload_b is just a struct. + val descView = s"${tableName}_desc_view" + spark.read.format("hudi") + .load(tablePath) + .createOrReplaceTempView(descView) + val ex = assertThrows(classOf[Throwable], new Executable { + override def execute(): Unit = { + spark.sql( + s"SELECT id, read_blob(payload_a) AS bytes_a, payload_b " + + s"FROM $descView ORDER BY id").collect() + } + }) + val msgChain = Iterator.iterate[Throwable](ex)(_.getCause).takeWhile(_ != null) + .flatMap(t => Option(t.getMessage)).mkString(" | ") + assertTrue(msgChain.contains("INLINE") && msgChain.contains("DESCRIPTOR"), + s"read_blob(payload_a) under DESCRIPTOR must throw INLINE+DESCRIPTOR error even when " + + s"mixed with a struct projection of another blob column; got: $msgChain") + } + + /** + * Compaction must preserve INLINE blob bytes under the DESCRIPTOR default. MOR compaction reads + * the base file via {@link HoodieSparkLanceReader}, which hard-pins CONTENT regardless of the + * user-facing {@code hoodie.read.blob.inline.mode}. If that pin were to honor the default + * (DESCRIPTOR), compaction would read null {@code data} and rewrite a base file without bytes, + * silently corrupting untouched rows. This test inserts INLINE blobs, upserts a subset to force + * compaction, and asserts that touched rows carry the new bytes while untouched rows retain the + * originals. + */ + @Test + def testBlobInlineCompactionRoundTrip(): Unit = { + val tableType = HoodieTableType.MERGE_ON_READ + val tableName = "test_lance_blob_inline_compact_mor" + val tablePath = s"$basePath/$tableName" + + val payloadLen = 1024 + val numRows = 6 + val initialPayloads: Seq[Array[Byte]] = (0 until numRows).map { i => + (0 until payloadLen).map(j => ((i + j) % 256).toByte).toArray + } + val sparkSess = spark + import sparkSess.implicits._ + + val canonicalSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("payload", BlobType().asInstanceOf[StructType], nullable = true, + BlobTestHelpers.blobMetadata) + )) + def asInlineDf(idToBytes: Seq[(Int, Array[Byte])]): DataFrame = { + val rawDf = idToBytes.toDF("id", "bytes") + .select($"id", BlobTestHelpers.inlineBlobStructCol("payload", $"bytes")) + spark.createDataFrame(rawDf.rdd, canonicalSchema) + } + + // First commit: bulk_insert ids 0..5 with the initial pattern. Lands in a base file. + writeDataframe(tableType, tableName, tablePath, + asInlineDf(initialPayloads.zipWithIndex.map { case (b, i) => (i, b) }), + saveMode = SaveMode.Overwrite, + operation = Some("bulk_insert"), + extraOptions = Map(PRECOMBINE_FIELD.key() -> "id")) + + assertLanceBlobEncoding(tablePath) + + // Second commit: upsert ids 0..2 with all-0xEE payloads, triggering inline compaction. The + // compactor reads the base file + log via the CONTENT-pinned reader and rewrites a new base + // file. Ids 3..5 are untouched: their bytes must survive the compaction read/rewrite even + // though the user-facing default is now DESCRIPTOR. + val updatedPayloadByte: Byte = 0xEE.toByte + val updatedIds = 0 until 3 + val updatedPayloads = updatedIds.map(i => (i, Array.fill[Byte](payloadLen)(updatedPayloadByte))) + writeDataframe(tableType, tableName, tablePath, + asInlineDf(updatedPayloads), + operation = Some("upsert"), + extraOptions = Map(PRECOMBINE_FIELD.key() -> "id", + "hoodie.compact.inline" -> "true", + "hoodie.compact.inline.max.delta.commits" -> "1")) + + val metaClient = HoodieTableMetaClient.builder() + .setConf(HoodieTestUtils.getDefaultStorageConf) + .setBasePath(tablePath) + .build() + val completedInstants = metaClient.reloadActiveTimeline().filterCompletedInstants() + .getInstants.asScala + val deltaCommits = completedInstants.filter(_.getAction == "deltacommit") + assertTrue(deltaCommits.nonEmpty, + "Upsert must have written a deltacommit on MOR — without log files the compaction " + + "round-trip below would be a no-op and the test would silently pass even if the " + + "CONTENT-pin in HoodieSparkLanceReader were broken.") + val compactionCommits = completedInstants.filter(_.getAction == "commit") + assertTrue(compactionCommits.nonEmpty, "Compaction commit should be present after upsert") + + // Walk file groups in the (non-partitioned) table and verify at least one historical file + // slice carries log files. After compaction the latest slice is post-compaction (no logs), + // but the pre-compaction slice is still in the FSV's history, so `hasLogFiles` will flag + // it. This catches a regression where the upsert silently fell into a CoW-like path. + val engineCtx = new HoodieLocalEngineContext(metaClient.getStorageConf) + val metadataCfg = HoodieMetadataConfig.newBuilder.build + val viewManager = FileSystemViewManager.createViewManager( + engineCtx, metadataCfg, FileSystemViewStorageConfig.newBuilder.build, + HoodieCommonConfig.newBuilder.build, + (mc: HoodieTableMetaClient) => metaClient.getTableFormat + .getMetadataFactory.create(engineCtx, mc.getStorage, metadataCfg, tablePath)) + val fsView = viewManager.getFileSystemView(metaClient) + try { + fsView.loadAllPartitions() + val anyHadLogs = fsView.getAllFileGroups("").iterator().asScala.exists { fg => + fg.getAllFileSlices.iterator().asScala.exists(_.hasLogFiles) + } + assertTrue(anyHadLogs, + s"MOR upsert must have produced log files in at least one file slice at $tablePath; " + + s"none observed — upsert may have silently bypassed the deltacommit path") + } finally { + fsView.close() + } + + val expected: Map[Int, Array[Byte]] = ( + updatedIds.map(i => i -> Array.fill[Byte](payloadLen)(updatedPayloadByte)) ++ + (updatedIds.length until numRows).map(i => i -> initialPayloads(i)) + ).toMap + + // Verify via the realistic user-facing path. After the flip, a plain read yields the + // DESCRIPTOR shape: INLINE type, null `data`, populated reference. This confirms the new + // default is in effect end-to-end. + val readRows = spark.read.format("hudi") + .load(tablePath) + .select($"id", $"payload") + .orderBy($"id") + .collect() + assertEquals(numRows, readRows.length) + readRows.foreach { row => + val id = row.getInt(row.fieldIndex("id")) + val payload = row.getStruct(row.fieldIndex("payload")) + assertEquals(HoodieSchema.Blob.INLINE, + payload.getString(payload.fieldIndex(HoodieSchema.Blob.TYPE)), + s"Type must remain INLINE post-compaction (id=$id)") + assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)), + s"DESCRIPTOR default should null `data` on plain read (id=$id)") + assertNotNull(payload.getStruct(payload.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)), + s"DESCRIPTOR default should populate reference on plain read (id=$id)") + } + + // read_blob() under CONTENT mode is what we use to verify the post-compaction bytes + // because read_blob() on INLINE rows throws under the DESCRIPTOR default. The bytes can + // only come back if HoodieSparkLanceReader's CONTENT pin held during the compactor's + // base-file read — otherwise untouched ids 3..5 would have been rewritten with null + // `data` and CONTENT-mode read would surface that. + val viewName = s"${tableName}_view" + spark.read.format("hudi") + .option("hoodie.read.blob.inline.mode", "CONTENT") + .load(tablePath) + .createOrReplaceTempView(viewName) + val materialized = spark.sql( + s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY id").collect() + assertEquals(numRows, materialized.length) + materialized.foreach { row => + val id = row.getInt(row.fieldIndex("id")) val bytes = row.getAs[Array[Byte]]("bytes") - assertArrayEquals(expectedPayloads(i), bytes, - s"read_blob() bytes mismatch for id=$i") + assertArrayEquals(expected(id), bytes, + s"read_blob() must return correct bytes post-compaction (id=$id)") } } diff --git a/rfc/rfc-100/rfc-100.md b/rfc/rfc-100/rfc-100.md index 2c637ccb2db2..2fc8e33d781f 100644 --- a/rfc/rfc-100/rfc-100.md +++ b/rfc/rfc-100/rfc-100.md @@ -130,65 +130,86 @@ SELECT id, url, read_blob(image_blob) as image_bytes FROM my_table; #### Read Modes: `read_blob` vs. `SELECT *` -`read_blob(<blob_column>)` is the canonical, universal API for materializing raw blob bytes in a query. It always returns the underlying `bytes` regardless of: -- Storage strategy (`INLINE` vs `OUT_OF_LINE`) -- Base file format (Parquet, Lance, …) -- Any reader-side config such as `hoodie.read.blob.inline.mode` +`read_blob(<blob_column>)` is the canonical API for materializing raw blob bytes in a query. It returns the underlying `bytes` for: +- Any `OUT_OF_LINE` row (via external pread), regardless of mode. +- `INLINE` rows under `CONTENT` mode (1-hop passthrough of `data`). +- `INLINE` rows on Parquet under either mode (Parquet's DESCRIPTOR is a no-op today; rows arrive in CONTENT shape). + +`INLINE` rows on Lance under `DESCRIPTOR` mode are **not supported** by `read_blob()` and the call throws a clear error. DESCRIPTOR is a metadata-only mode for INLINE rows: bytes aren't materialized during the scan, and the synthesized `reference` is an internal pointer into the `.lance` file's storage layout, not user-facing metadata. To read bytes, set `hoodie.read.blob.inline.mode=CONTENT`. Selecting the blob column directly (e.g. `SELECT image_blob FROM t` or `SELECT *`) returns the underlying `Blob` struct as-is. The contents of that struct depend on the storage strategy, the file format, and the read mode, as summarized below. **Reader Configuration** -- `hoodie.read.blob.inline.mode` — values `CONTENT` (default) | `DESCRIPTOR`. +- `hoodie.read.blob.inline.mode` — values `DESCRIPTOR` (default) | `CONTENT`. + - `DESCRIPTOR` (default): metadata-only for `INLINE` rows. On Lance, the engine returns a synthesized `reference` pointing at the backing file so callers can inspect `(path, offset, length)` without materializing bytes. `read_blob()` on these rows is **unsupported and throws** — switch to `CONTENT` or read `col.data` if you want bytes. - `CONTENT`: the engine eagerly materializes inline bytes into the struct's `data` field. - - `DESCRIPTOR`: the engine returns an `OUT_OF_LINE`-shaped descriptor in the `reference` field where the underlying file format supports it (Lance today), enabling lazy byte materialization via `read_blob`. For file formats without a native descriptor for inline payloads (Parquet), both `data` and `reference` are returned `NULL`, and the caller must use `read_blob` to retrieve bytes. - - This config governs `INLINE` reads only. For `OUT_OF_LINE` storage, the engine always returns a populated `reference` regardless of this setting. + - This config governs `INLINE` reads only. For `OUT_OF_LINE` storage, the engine always returns a populated `reference` regardless of this setting, and `read_blob()` always materializes bytes. **Behavior matrix** -| Access pattern | Storage | File format | `hoodie.read.blob.inline.mode` | `data` field | `reference` field | Raw bytes available? | -|------------------|--------------|-------------|--------------------------------|--------------|------------------------------|---------------------------------------------------| -| `SELECT read_blob(col) FROM table` | INLINE | Parquet | (any) | n/a | n/a | Yes — returns bytes | -| `SELECT read_blob(col) FROM table` | INLINE | Lance | (any) | n/a | n/a | Yes — returns bytes | -| `SELECT read_blob(col) FROM table` | OUT_OF_LINE | (any) | (any) | n/a | n/a | Yes — returns bytes | -| `SELECT col FROM table` | INLINE | Parquet | `CONTENT` (default) | bytes | NULL | Yes — via `data` | -| `SELECT col FROM table` | INLINE | Parquet | `DESCRIPTOR` | **NULL** | **NULL** | No — must call `read_blob` | -| `SELECT col FROM table` | INLINE | Lance | `CONTENT` (default) | bytes | NULL | Yes — via `data` | -| `SELECT col FROM table` | INLINE | Lance | `DESCRIPTOR` | NULL | populated (Lance blob enc.) | No — descriptor visible; use `read_blob` for bytes| -| `SELECT col FROM table` | OUT_OF_LINE | (any) | (irrelevant) | NULL | populated | No — must call `read_blob` | +| Access pattern | Storage | File format | `hoodie.read.blob.inline.mode` | `data` field | `reference` field | Raw bytes available? | +|------------------|--------------|-------------|--------------------------------|--------------|-----------------------------|---------------------------------------------------| +| `SELECT read_blob(col) FROM table` | INLINE | Parquet | (any) | n/a | n/a | Yes — returns bytes (1-hop) | +| `SELECT read_blob(col) FROM table` | INLINE | Lance | `CONTENT` | n/a | n/a | Yes — returns bytes (1-hop) | +| `SELECT read_blob(col) FROM table` | INLINE | Lance | `DESCRIPTOR` (default) | n/a | n/a | **No — throws.** Set `CONTENT` or use `col.data`. | +| `SELECT read_blob(col) FROM table` | OUT_OF_LINE | (any) | (any) | n/a | n/a | Yes — returns bytes (external pread) | +| `SELECT col FROM table` | INLINE | Parquet | `CONTENT` | bytes | NULL | Yes — via `data` | +| `SELECT col FROM table` | INLINE | Parquet | `DESCRIPTOR` (default) | bytes¹ | NULL | Yes — via `data`¹ | +| `SELECT col FROM table` | INLINE | Lance | `CONTENT` | bytes | NULL | Yes — via `data` | +| `SELECT col FROM table` | INLINE | Lance | `DESCRIPTOR` (default) | NULL | populated (synthesized) | No — metadata-only; switch mode to read bytes | +| `SELECT col FROM table` | OUT_OF_LINE | (any) | (irrelevant) | NULL | populated | No — must call `read_blob` | + **Why Parquet and Lance differ in `DESCRIPTOR` mode** -Lance's native blob encoding stores blobs in a way that already exposes a `(file, offset, length)` descriptor cheaply, so `DESCRIPTOR` mode surfaces it directly in the `reference` field — effectively letting INLINE blobs be read with the same deferred-materialization path used for OUT_OF_LINE references. Parquet has no equivalent native descriptor for an inline byte array, so both fields are `NULL` in `DESCRIPTOR` mode and the caller must use `read_blob` to materialize bytes. +Lance's native blob encoding stores blobs in a way that already exposes a `(file, offset, length)` descriptor cheaply, so `DESCRIPTOR` mode surfaces it directly in the `reference` field — effectively letting INLINE blobs be read with the same deferred-materialization path used for OUT_OF_LINE references. Parquet has no equivalent native descriptor for an inline byte array, which is why the DESCRIPTOR path is currently a no-op there. **Visual** +What the user gets back, grouped by storage type (set at write time) and then by query shape: + +```mermaid +flowchart TD + ST{storage_type} + + ST -->|OUT_OF_LINE| QO{Query} + QO -->|"SELECT col"| OOL["type = OUT_OF_LINE<br/>inline_data = NULL<br/>reference = user-supplied"] + QO -->|"SELECT read_blob(col)"| RBO(["bytes — materialized<br/>via the external reference"]) + + ST -->|INLINE| QI{Query} + QI -->|"SELECT col"| M{hoodie.read.blob.inline.mode} + M -->|CONTENT| CONT["type = INLINE<br/>inline_data = bytes<br/>reference = NULL"] + M -->|DESCRIPTOR default| F{file format} + F -->|Lance| LD["type = INLINE<br/>inline_data = NULL<br/>reference = synthetic managed<br/>path, offset, length, is_managed=true"] + F -->|"Parquet (today: mode no-op)"| PD["Parquet reader does not<br/>implement DESCRIPTOR yet —<br/>returns CONTENT shape:<br/>inline_data = bytes, reference = NULL"] + + QI -->|"SELECT read_blob(col)"| RM{hoodie.read.blob.inline.mode} + RM -->|CONTENT| RBC(["bytes from inline_data on the row<br/>1 hop"]) + RM -->|DESCRIPTOR default| RF{file format} + RF -->|Lance| RBL(["error: read_blob unsupported<br/>on INLINE rows under DESCRIPTOR<br/>(metadata-only mode)"]) + RF -->|"Parquet (today: mode no-op)"| RBP(["bytes from inline_data on the row<br/>1 hop — same as CONTENT"]) ``` - ┌──────────────────────────────────────────────────────────────────┐ - │ read_blob(col) ── universal, always materializes bytes ──│ - │ │ │ - │ ▼ │ - │ ┌─────────────┐ INLINE ───► read inline payload │ - │ │ Hudi reader │ ──┤ │ - │ └─────────────┘ OUT_OF_LINE ► follow reference → read bytes │ - └──────────────────────────────────────────────────────────────────┘ - - ┌──────────────────────────────────────────────────────────────────┐ - │ SELECT col (returns Blob struct as-is) │ - │ │ │ - │ ▼ │ - │ storage = OUT_OF_LINE ─────────────► data=NULL, reference=set │ - │ │ - │ storage = INLINE, │ - │ inline.mode = CONTENT (default) ───► data=<bytes>, ref=NULL │ - │ │ - │ storage = INLINE, │ - │ inline.mode = DESCRIPTOR │ - │ ├─ Parquet ─────────────────────► data=NULL, ref=NULL │ - │ └─ Lance ─────────────────────► data=NULL, ref=set │ - └──────────────────────────────────────────────────────────────────┘ + +`read_blob(col)` byte resolution — hop count depends on the row shape that arrives. INLINE rows under DESCRIPTOR mode are rejected; the mode is metadata-only and the synthesized reference is an internal pointer, not user-facing storage. + +```mermaid +flowchart LR + RB[/"SELECT read_blob(col)"/] --> Scan["Scan emits Blob struct"] + Scan --> Shape{"row shape"} + Shape -->|"inline_data populated<br/>(CONTENT mode, or Parquet)"| Direct["BatchedBlobReader:<br/>read inline_data off the row<br/><b>1 hop total</b>"] + Shape -->|"reference populated"| Type{"storage_type"} + Type -->|"OUT_OF_LINE"| Indirect["BatchedBlobReader:<br/>openSeekable(external_path)<br/>seek(offset), readFully(length)<br/><b>2 hops total</b>"] + Type -->|"INLINE<br/>(DESCRIPTOR + Lance)"| Err["error: read_blob unsupported<br/>on INLINE rows under DESCRIPTOR"] + Direct --> Bytes(["bytes"]) + Indirect --> Bytes ``` +Notes: +- INLINE + DESCRIPTOR + `read_blob()` is unsupported by design. DESCRIPTOR is metadata-only for INLINE rows: bytes aren't materialized during the scan, and the synthesized `reference` is an internal pointer into the `.lance` file's storage layout, not user-facing metadata. To read bytes, set `hoodie.read.blob.inline.mode=CONTENT`. +- OUT_OF_LINE is unaffected by `hoodie.read.blob.inline.mode` — the descriptor is real user metadata and `read_blob()` always works via an external pread. +- Plain `SELECT col` (no `read_blob`) is always 1 hop. DESCRIPTOR's win is that hop 1 skips blob decoding when bytes aren't needed. + ### 3. Writer #### Phase 1: External Blob Support The writer will be updated to support writing blob data as out-of-line references.
