hudi-agent commented on code in PR #18683:
URL: https://github.com/apache/hudi/pull/18683#discussion_r3244377074


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -237,11 +237,16 @@ abstract class HoodieBaseHadoopFsRelationFactory(val 
sqlContext: SQLContext,
 
   override def buildFileFormat(): FileFormat = {
     val tableConfig = metaClient.getTableConfig

Review Comment:
   🤖 nit: could you rename `blobDescriptorMode` to `isBlobDescriptorMode`? It's 
a boolean and the constructor parameter it's passed into 
(`isBlobDescriptorMode`) already uses the `is` prefix — the local val not 
following suit is a small but jarring inconsistency.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -457,6 +491,46 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     }
   }
 
+  /**
+   * Detects BLOB columns and strips the {@code data} sub-field when 
DESCRIPTOR mode is active.
+   * Only applies to Parquet format; other formats handle DESCRIPTOR mode 
natively.
+   *
+   * @param forceContentCols Top-level column names that must keep their 
{@code data} sub-field
+   *                         (i.e. the columns the current query reads via 
{@code read_blob()}).
+   *                         These are excluded from the strip set so the 
bytes are materialized.
+   */
+  private def withBlobDescriptorRewrite(schema: StructType,
+                                        forceContentCols: Set[String]): 
(StructType, Set[Int]) = {
+    if (hoodieFileFormat != HoodieFileFormat.PARQUET) {
+      (schema, Set.empty[Int])
+    } else {
+      import scala.collection.JavaConverters._
+      val detected = 
VectorConversionUtils.detectBlobColumnsFromMetadata(schema).asScala.map(_.intValue()).toSet
+      val toStrip = if (forceContentCols.isEmpty) detected
+        else detected.filterNot(idx => 
forceContentCols.contains(schema.fields(idx).name))
+      if (toStrip.isEmpty) {
+        (schema, Set.empty[Int])
+      } else {
+        val javaBlobCols: java.util.Set[Integer] = 
toStrip.map(Integer.valueOf).asJava
+        (VectorConversionUtils.stripBlobDataField(schema, javaBlobCols), 
toStrip)
+      }
+    }
+  }
+
+  /**
+   * Wraps an iterator to re-insert null {@code data} fields into blob structs
+   * after Parquet DESCRIPTOR mode read (expanding 2-field → 3-field structs).
+   */
+  private def wrapWithBlobNullPadding(iter: Iterator[InternalRow],

Review Comment:
   🤖 nit: the body of this `wrapWithBlobNullPadding` (create projection → 
create mapper → wrap iterator) is almost identical to 
`SparkFileFormatInternalRowReaderContext.wrapWithBlobNullPadding` at line 421 
of that file — only the iterator type differs. Have you considered 
consolidating the shared setup into `VectorConversionUtils` or a small shared 
utility so the two call sites can't silently diverge if 
`buildBlobNullPadRowMapper`'s signature changes?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala:
##########
@@ -495,4 +499,296 @@ class TestReadBlobSQL extends HoodieClientTestBase {
       assertBytesContent(row.getAs[Array[Byte]]("data"), expectedOffset = idx 
* 100)
     }
   }
+
+  // ------------------------------------------------------------------
+  // Parquet DESCRIPTOR-mode interaction tests
+  //
+  // These exercise the per-query rewrite added by ReadBlobRule that
+  // injects BLOB_INLINE_READ_FORCE_CONTENT_COLUMNS into the
+  // LogicalRelation's options when a query uses read_blob(). The
+  // contract: read_blob(col) always returns bytes; plain SELECT keeps
+  // DESCRIPTOR's I/O savings (data sub-field is null) for the columns
+  // that aren't referenced by read_blob().
+  // ------------------------------------------------------------------
+
+  /**
+   * Helpers for the DESCRIPTOR-mode tests. Builds a Hudi table containing
+   * one or two INLINE blob columns and returns the table path.
+   */
+  private def writeInlineBlobTable(name: String,
+                                   tableType: HoodieTableType,
+                                   payloads: Seq[Array[Byte]]): String = {
+    val tablePath = s"$tempDir/$name"
+    val rawDf = sparkSession.createDataFrame(
+        payloads.zipWithIndex.map { case (bytes, i) => (i + 1, bytes) })
+      .toDF("id", "bytes")
+      .withColumn("payload", inlineBlobStructCol("payload", col("bytes")))
+      .select("id", "payload")
+    val canonicalSchema = StructType(Seq(
+      StructField("id", IntegerType, nullable = false),
+      StructField("payload", BlobType().asInstanceOf[StructType], nullable = 
true, blobMetadata)
+    ))
+    val df = sparkSession.createDataFrame(rawDf.rdd, canonicalSchema)
+    df.write.format("hudi")
+      .option("hoodie.table.name", name)
+      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id")
+      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "id")
+      .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType.name())
+      .option(DataSourceWriteOptions.OPERATION.key(), "bulk_insert")
+      .mode("overwrite")
+      .save(tablePath)
+    tablePath
+  }
+
+  /**
+   * Core contract: read_blob() always materializes bytes, even under
+   * DESCRIPTOR mode, on both COW and MOR base files. Without this fix,
+   * read_blob() would see a null `data` sub-field (column-pruned by
+   * Parquet) and silently return null.
+   */
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testReadBlobUnderDescriptorMaterializesBytes(tableType: 
HoodieTableType): Unit = {
+    val payloads = Seq(
+      Array.fill[Byte](128)(0x1.toByte),
+      Array.fill[Byte](128)(0x2.toByte),
+      Array.fill[Byte](128)(0x3.toByte))
+    val tablePath = writeInlineBlobTable(
+      s"read_blob_desc_${tableType.name().toLowerCase}", tableType, payloads)
+
+    sparkSession.read.format("hudi")
+      .option("hoodie.read.blob.inline.mode", "DESCRIPTOR")
+      .load(tablePath)
+      .createOrReplaceTempView("rb_desc_view")
+
+    val rows = sparkSession.sql(
+      "SELECT id, read_blob(payload) AS bytes FROM rb_desc_view ORDER BY id"
+    ).collect()
+    assertEquals(3, rows.length)
+    rows.zip(payloads).foreach { case (row, expected) =>
+      val bytes = row.getAs[Array[Byte]]("bytes")
+      assertNotNull(bytes, s"read_blob() must materialize bytes under 
DESCRIPTOR (id=${row.getInt(0)})")
+      assertArrayEquals(expected, bytes, s"bytes mismatch for 
id=${row.getInt(0)}")
+    }
+  }
+
+  /**
+   * DESCRIPTOR savings preserved when read_blob() is NOT in the query:
+   * commit 1's column projection still strips `data`, and the rule writes
+   * no force-content option.
+   */
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testDescriptorWithoutReadBlobStillSkipsData(tableType: HoodieTableType): 
Unit = {
+    val payloads = Seq(
+      Array.fill[Byte](128)(0x1.toByte),
+      Array.fill[Byte](128)(0x2.toByte))
+    val tablePath = writeInlineBlobTable(
+      s"desc_no_rb_${tableType.name().toLowerCase}", tableType, payloads)
+
+    val rows = sparkSession.read.format("hudi")
+      .option("hoodie.read.blob.inline.mode", "DESCRIPTOR")
+      .load(tablePath)
+      .select(col("id"), col("payload"))
+      .orderBy(col("id"))
+      .collect()
+
+    assertEquals(2, rows.length)
+    rows.foreach { row =>
+      val payload = row.getStruct(row.fieldIndex("payload"))
+      assertEquals(HoodieSchema.Blob.INLINE,
+        payload.getString(payload.fieldIndex(HoodieSchema.Blob.TYPE)))
+      
assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)),
+        s"DESCRIPTOR should null-pad data when read_blob() is absent 
(id=${row.getInt(0)})")
+      
assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)),
+        "Parquet has no native byte-range descriptor; reference is null")
+    }
+  }
+
+  /**
+   * The per-column granularity claim. Query references read_blob(payload_a)
+   * only — payload_a must materialize bytes, payload_b must remain
+   * stripped (DESCRIPTOR savings preserved for the column the user
+   * didn't ask about).
+   *
+   * Uses multiple rows with distinct per-row byte patterns so that any
+   * row-iteration bug (e.g., reusing a row buffer without copying, or
+   * mis-indexing the blob struct ordinal across rows) would surface as
+   * mismatched bytes per row rather than slipping through a single-row
+   * happy path.
+   */
+  @Test
+  def testDescriptorPerColumnGranularity(): Unit = {
+    val tablePath = s"$tempDir/desc_per_column"
+    // Distinct fill byte AND distinct length per row, AND per-row distinction
+    // between payload_a and payload_b — a cross-row or cross-column leak 
fails an assertion.
+    val rows = Seq(
+      (1, Array.fill[Byte](80)(0xA1.toByte), 
Array.fill[Byte](160)(0xB1.toByte)),
+      (2, Array.fill[Byte](64)(0xA2.toByte), 
Array.fill[Byte](192)(0xB2.toByte)),
+      (3, Array.fill[Byte](96)(0xA3.toByte), 
Array.fill[Byte](128)(0xB3.toByte))
+    )
+    val rawDf = sparkSession.createDataFrame(rows)
+      .toDF("id", "bytes_a", "bytes_b")
+      .withColumn("payload_a", inlineBlobStructCol("payload_a", 
col("bytes_a")))
+      .withColumn("payload_b", inlineBlobStructCol("payload_b", 
col("bytes_b")))
+      .select("id", "payload_a", "payload_b")
+    val canonicalSchema = StructType(Seq(
+      StructField("id", IntegerType, nullable = false),
+      StructField("payload_a", BlobType().asInstanceOf[StructType], nullable = 
true, blobMetadata),
+      StructField("payload_b", BlobType().asInstanceOf[StructType], nullable = 
true, blobMetadata)
+    ))
+    sparkSession.createDataFrame(rawDf.rdd, 
canonicalSchema).write.format("hudi")
+      .option("hoodie.table.name", "desc_per_column")
+      .option("hoodie.datasource.write.recordkey.field", "id")
+      .option("hoodie.datasource.write.operation", "bulk_insert")
+      .mode("overwrite")
+      .save(tablePath)
+
+    sparkSession.read.format("hudi")
+      .option("hoodie.read.blob.inline.mode", "DESCRIPTOR")
+      .load(tablePath)
+      .createOrReplaceTempView("desc_per_column_view")
+
+    val outRows = sparkSession.sql(
+      "SELECT id, read_blob(payload_a) AS bytes_a, payload_b " +
+        "FROM desc_per_column_view ORDER BY id"
+    ).collect()
+    assertEquals(rows.length, outRows.length)
+    outRows.zip(rows).foreach { case (row, (expectedId, expectedA, expectedB)) 
=>
+      assertEquals(expectedId, row.getInt(0))
+      val bytesA = row.getAs[Array[Byte]]("bytes_a")
+      assertArrayEquals(expectedA, bytesA,
+        s"read_blob(payload_a) bytes mismatch at id=$expectedId — expected 
length ${expectedA.length}, " +
+          s"got length ${if (bytesA == null) -1 else bytesA.length}")
+      val payloadB = row.getStruct(row.fieldIndex("payload_b"))
+      
assertTrue(payloadB.isNullAt(payloadB.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)),
+        s"DESCRIPTOR savings must be preserved for payload_b at 
id=$expectedId")
+      // Sanity: payload_b's type marker survived even though `data` was 
stripped.
+      assertEquals(HoodieSchema.Blob.INLINE,
+        payloadB.getString(payloadB.fieldIndex(HoodieSchema.Blob.TYPE)),
+        s"payload_b type marker must survive stripping at id=$expectedId")
+      // Sanity: we did NOT smuggle bytes into payload_b under any name.
+      val _ = expectedB // explicitly unused: the contract is that 
payload_b.data must be null
+    }
+  }
+
+  /**
+   * read_blob() in WHERE clause must also trigger the per-query rewrite.
+   * ReadBlobRule's Filter case wraps the condition in BatchedBlobRead;
+   * the second pass collects the blobAttr the same way.
+   */
+  @Test
+  def testReadBlobInWhereClauseUnderDescriptor(): Unit = {
+    val payloads = Seq(
+      Array.fill[Byte](100)(0xA.toByte),
+      Array.fill[Byte](200)(0xB.toByte),
+      Array.fill[Byte](100)(0xC.toByte))
+    val tablePath = writeInlineBlobTable(
+      "desc_where_clause", HoodieTableType.COPY_ON_WRITE, payloads)
+
+    sparkSession.read.format("hudi")
+      .option("hoodie.read.blob.inline.mode", "DESCRIPTOR")
+      .load(tablePath)
+      .createOrReplaceTempView("desc_where_view")
+
+    val rows = sparkSession.sql(
+      "SELECT id, read_blob(payload) AS bytes FROM desc_where_view " +
+        "WHERE length(read_blob(payload)) = 200"
+    ).collect()
+    assertEquals(1, rows.length)
+    assertEquals(2, rows(0).getInt(0))
+    assertArrayEquals(payloads(1), rows(0).getAs[Array[Byte]]("bytes"))
+  }
+
+  /**
+   * JOIN of two Hudi Parquet tables, both in DESCRIPTOR mode, both with
+   * a blob column. The query uses read_blob() on only the left side's
+   * blob.
+   *
+   * This exercises ReadBlobRule's per-relation option routing: the
+   * BLOB_INLINE_READ_FORCE_CONTENT_COLUMNS option must land on the
+   * left table's LogicalRelation only, and the right table's payload
+   * must come back with DESCRIPTOR's null `data`. A bug where the
+   * rule writes the option to every Hudi LogicalRelation, or to

Review Comment:
   🤖 Heads-up: `@Ignore` is from JUnit 4 (`org.junit.Ignore`), but the 
surrounding test class uses JUnit 5 (`@Test` from `org.junit.jupiter.api`). The 
Jupiter engine doesn't honor JUnit 4's `@Ignore`, so this test will actually 
execute rather than be skipped — the rest of the file uses `@Disabled` from 
`org.junit.jupiter.api.Disabled` for that purpose. Given the `TODO to 
re-enable` comment, was the intent to skip this? If so, switching to 
`@Disabled("reason")` would do it.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to