hudi-agent commented on code in PR #18683:
URL: https://github.com/apache/hudi/pull/18683#discussion_r3211521485


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -86,9 +86,25 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
                                            shouldUseRecordPosition: Boolean,
                                            requiredFilters: Seq[Filter],
                                            isMultipleBaseFileFormatsEnabled: 
Boolean,
-                                           hoodieFileFormat: HoodieFileFormat)
+                                           val hoodieFileFormat: 
HoodieFileFormat,
+                                           initialBlobDescriptorMode: Boolean 
= false)
   extends ParquetFileFormat with SparkAdapterSupport with HoodieFormatTrait 
with Logging with Serializable {
 
+  // Mutable so ReadBlobRule can flip DESCRIPTOR→CONTENT when read_blob() 
appears in a query.
+  // We mutate in place because Spark's planner/AQE retains a reference to 
this FileFormat
+  // instance even after the optimizer rewrites the LogicalRelation, so 
swapping wouldn't stick.
+  @volatile private var _isBlobDescriptorMode: Boolean = 
initialBlobDescriptorMode

Review Comment:
   🤖 I'm worried this mutable flag races across concurrent queries that share a 
FileFormat instance via a temp view. If query A (uses `read_blob()`) sets the 
flag to `false` during its optimization, and query B (no `read_blob()`) runs 
concurrently and calls `restoreBlobDescriptorMode()` before A's tasks are 
serialized, A's executors will deserialize the FileFormat with `flag=true` — 
the executor-side `readBaseFile` (line 565) reads `isBlobDescriptorMode` and 
would strip the data column, causing `read_blob()` to silently return null. 
Same hazard in the opposite direction for B. Async query submission (e.g. 
`Future`s, Airflow, multi-cell notebooks) makes this realistic. Have you 
considered passing the per-query mode through the LogicalRelation's `options` 
map / Hadoop conf at rule time instead of mutating shared state? @nsivabalan 
@yihua thoughts on whether this concurrency surface is acceptable here?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala:
##########
@@ -80,12 +90,39 @@ case class ReadBlobRule(spark: SparkSession) extends 
Rule[LogicalPlan] {
       throw new IllegalArgumentException(
         s"read_blob() may only appear in SELECT or WHERE clauses. Found in 
unsupported logical plan node: ${node.nodeName}. " +
         s"Move read_blob() to a SELECT or WHERE clause. Full plan: 
${node.simpleStringWithNodeId()}")
+    }
   }
 
   private def containsReadBlobInAnyExpression(plan: LogicalPlan): Boolean = {
     plan.expressions.exists(containsReadBlobInExpression)
   }
 
+  private def containsReadBlobAnywhere(plan: LogicalPlan): Boolean = {

Review Comment:
   🤖 nit: `containsReadBlobAnywhere` is dangerously close to the existing 
`containsReadBlobInAnyExpression` just above it, but does something 
meaningfully different — it walks the entire plan tree rather than checking 
only the current node's expressions. A name like `planTreeContainsReadBlob` or 
`treeContainsReadBlob` would make the scope distinction immediately obvious to 
the next person who touches this rule.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala:
##########
@@ -41,7 +44,14 @@ import scala.collection.mutable.ArrayBuffer
  */
 case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] {
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp 
{
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    // Parquet DESCRIPTOR strips the data column for I/O savings, but Parquet 
has no byte-range
+    // channel like Lance, so read_blob() would return null. Force CONTENT for 
scans whose query
+    // uses read_blob(), and restore DESCRIPTOR for queries that don't (the 
FileFormat instance
+    // is shared across queries against the same temp view).
+    val needsContentMode = containsReadBlobAnywhere(plan) || 
plan.exists(_.isInstanceOf[BatchedBlobRead])
+    syncDescriptorMode(plan, forceContentMode = needsContentMode)

Review Comment:
   🤖 `syncDescriptorMode` is a side effect inside an optimizer rule's `apply`. 
Spark's optimizer batches typically run rules to a fixed point, so this can 
fire multiple times per query. It's idempotent within a single query (good), 
but the side effect on a shared FileFormat ordering between concurrent queries' 
optimizer passes is what creates the race I flagged in 
HoodieFileGroupReaderBasedFileFormat. Have you considered `mapChildren` / 
structural rewrites to encode the mode in the plan (e.g. as a hint or option) 
rather than mutating the FileFormat?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -120,12 +142,18 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
         readSchema, StructType(Seq.empty), 
getSchemaHandler.getInternalSchemaOpt,
         readFilters, 
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]], 
tableSchemaOpt))
 
-      // Post-process: convert binary VECTOR columns back to typed arrays
-      if (vectorColumnInfo.nonEmpty) {
-        
SparkFileFormatInternalRowReaderContext.wrapWithVectorConversion(rawIterator, 
vectorColumnInfo, readSchema)
+      // Post-process: re-insert null `data` field into blob structs, then 
convert vectors
+      val blobPaddedIterator = if (blobColumnIndices.nonEmpty) {
+        
SparkFileFormatInternalRowReaderContext.wrapWithBlobNullPadding(rawIterator, 
blobColumnIndices, readSchema, parquetReadStructType)
       } else {
         rawIterator
       }
+
+      if (vectorColumnInfo.nonEmpty) {
+        
SparkFileFormatInternalRowReaderContext.wrapWithVectorConversion(blobPaddedIterator,
 vectorColumnInfo, if (blobColumnIndices.nonEmpty) parquetReadStructType else 
readSchema)
+      } else {
+        blobPaddedIterator
+      }

Review Comment:
   🤖 For MOR tables with log files, the log-file branch (line 123-126) reads 
with the full `requiredSchema`, so log records keep their populated `data` 
field, while base-file records under DESCRIPTOR get `data=null`. After merge 
the user sees a mix: records updated via log have bytes, records still in base 
have null. Is this the intended semantics, or should DESCRIPTOR also null the 
data on log-file rows for consistency?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala:
##########
@@ -80,12 +90,39 @@ case class ReadBlobRule(spark: SparkSession) extends 
Rule[LogicalPlan] {
       throw new IllegalArgumentException(
         s"read_blob() may only appear in SELECT or WHERE clauses. Found in 
unsupported logical plan node: ${node.nodeName}. " +
         s"Move read_blob() to a SELECT or WHERE clause. Full plan: 
${node.simpleStringWithNodeId()}")
+    }
   }
 
   private def containsReadBlobInAnyExpression(plan: LogicalPlan): Boolean = {
     plan.expressions.exists(containsReadBlobInExpression)
   }
 
+  private def containsReadBlobAnywhere(plan: LogicalPlan): Boolean = {
+    plan.exists(node => node.expressions.exists(containsReadBlobInExpression))
+  }
+
+  // Lance scans are skipped because their hoodieFileFormat != PARQUET and 
Lance already
+  // supports read_blob() under DESCRIPTOR via byte-range references.
+  private def syncDescriptorMode(plan: LogicalPlan, forceContentMode: 
Boolean): Unit = {

Review Comment:
   🤖 nit: `syncDescriptorMode` reads like a concurrency-sync operation rather 
than a policy-apply step. Something like `applyDescriptorModePolicy` or 
`updateDescriptorModeForPlan` would make it clearer that this is deciding 
whether to downgrade DESCRIPTOR→CONTENT based on query content, not 
synchronizing threads.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to