hudi-agent commented on code in PR #18683:
URL: https://github.com/apache/hudi/pull/18683#discussion_r3211521485
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -86,9 +86,25 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
shouldUseRecordPosition: Boolean,
requiredFilters: Seq[Filter],
isMultipleBaseFileFormatsEnabled:
Boolean,
- hoodieFileFormat: HoodieFileFormat)
+ val hoodieFileFormat:
HoodieFileFormat,
+ initialBlobDescriptorMode: Boolean
= false)
extends ParquetFileFormat with SparkAdapterSupport with HoodieFormatTrait
with Logging with Serializable {
+ // Mutable so ReadBlobRule can flip DESCRIPTOR→CONTENT when read_blob()
appears in a query.
+ // We mutate in place because Spark's planner/AQE retains a reference to
this FileFormat
+ // instance even after the optimizer rewrites the LogicalRelation, so
swapping wouldn't stick.
+ @volatile private var _isBlobDescriptorMode: Boolean =
initialBlobDescriptorMode
Review Comment:
🤖 I'm worried this mutable flag races across concurrent queries that share a
FileFormat instance via a temp view. If query A (uses `read_blob()`) sets the
flag to `false` during its optimization, and query B (no `read_blob()`) runs
concurrently and calls `restoreBlobDescriptorMode()` before A's tasks are
serialized, A's executors will deserialize the FileFormat with `flag=true` —
the executor-side `readBaseFile` (line 565) reads `isBlobDescriptorMode` and
would strip the data column, causing `read_blob()` to silently return null.
Same hazard in the opposite direction for B. Async query submission (e.g.
`Future`s, Airflow, multi-cell notebooks) makes this realistic. Have you
considered passing the per-query mode through the LogicalRelation's `options`
map / Hadoop conf at rule time instead of mutating shared state? @nsivabalan
@yihua thoughts on whether this concurrency surface is acceptable here?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala:
##########
@@ -80,12 +90,39 @@ case class ReadBlobRule(spark: SparkSession) extends
Rule[LogicalPlan] {
throw new IllegalArgumentException(
s"read_blob() may only appear in SELECT or WHERE clauses. Found in
unsupported logical plan node: ${node.nodeName}. " +
s"Move read_blob() to a SELECT or WHERE clause. Full plan:
${node.simpleStringWithNodeId()}")
+ }
}
private def containsReadBlobInAnyExpression(plan: LogicalPlan): Boolean = {
plan.expressions.exists(containsReadBlobInExpression)
}
+ private def containsReadBlobAnywhere(plan: LogicalPlan): Boolean = {
Review Comment:
🤖 nit: `containsReadBlobAnywhere` is dangerously close to the existing
`containsReadBlobInAnyExpression` just above it, but does something
meaningfully different — it walks the entire plan tree rather than checking
only the current node's expressions. A name like `planTreeContainsReadBlob` or
`treeContainsReadBlob` would make the scope distinction immediately obvious to
the next person who touches this rule.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala:
##########
@@ -41,7 +44,14 @@ import scala.collection.mutable.ArrayBuffer
*/
case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] {
- override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp
{
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ // Parquet DESCRIPTOR strips the data column for I/O savings, but Parquet
has no byte-range
+ // channel like Lance, so read_blob() would return null. Force CONTENT for
scans whose query
+ // uses read_blob(), and restore DESCRIPTOR for queries that don't (the
FileFormat instance
+ // is shared across queries against the same temp view).
+ val needsContentMode = containsReadBlobAnywhere(plan) ||
plan.exists(_.isInstanceOf[BatchedBlobRead])
+ syncDescriptorMode(plan, forceContentMode = needsContentMode)
Review Comment:
🤖 `syncDescriptorMode` is a side effect inside an optimizer rule's `apply`.
Spark's optimizer batches typically run rules to a fixed point, so this can
fire multiple times per query. It's idempotent within a single query (good),
but the side effect on a shared FileFormat ordering between concurrent queries'
optimizer passes is what creates the race I flagged in
HoodieFileGroupReaderBasedFileFormat. Have you considered `mapChildren` /
structural rewrites to encode the mode in the plan (e.g. as a hint or option)
rather than mutating the FileFormat?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -120,12 +142,18 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
readSchema, StructType(Seq.empty),
getSchemaHandler.getInternalSchemaOpt,
readFilters,
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]],
tableSchemaOpt))
- // Post-process: convert binary VECTOR columns back to typed arrays
- if (vectorColumnInfo.nonEmpty) {
-
SparkFileFormatInternalRowReaderContext.wrapWithVectorConversion(rawIterator,
vectorColumnInfo, readSchema)
+ // Post-process: re-insert null `data` field into blob structs, then
convert vectors
+ val blobPaddedIterator = if (blobColumnIndices.nonEmpty) {
+
SparkFileFormatInternalRowReaderContext.wrapWithBlobNullPadding(rawIterator,
blobColumnIndices, readSchema, parquetReadStructType)
} else {
rawIterator
}
+
+ if (vectorColumnInfo.nonEmpty) {
+
SparkFileFormatInternalRowReaderContext.wrapWithVectorConversion(blobPaddedIterator,
vectorColumnInfo, if (blobColumnIndices.nonEmpty) parquetReadStructType else
readSchema)
+ } else {
+ blobPaddedIterator
+ }
Review Comment:
🤖 For MOR tables with log files, the log-file branch (line 123-126) reads
with the full `requiredSchema`, so log records keep their populated `data`
field, while base-file records under DESCRIPTOR get `data=null`. After merge
the user sees a mix: records updated via log have bytes, records still in base
have null. Is this the intended semantics, or should DESCRIPTOR also null the
data on log-file rows for consistency?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala:
##########
@@ -80,12 +90,39 @@ case class ReadBlobRule(spark: SparkSession) extends
Rule[LogicalPlan] {
throw new IllegalArgumentException(
s"read_blob() may only appear in SELECT or WHERE clauses. Found in
unsupported logical plan node: ${node.nodeName}. " +
s"Move read_blob() to a SELECT or WHERE clause. Full plan:
${node.simpleStringWithNodeId()}")
+ }
}
private def containsReadBlobInAnyExpression(plan: LogicalPlan): Boolean = {
plan.expressions.exists(containsReadBlobInExpression)
}
+ private def containsReadBlobAnywhere(plan: LogicalPlan): Boolean = {
+ plan.exists(node => node.expressions.exists(containsReadBlobInExpression))
+ }
+
+ // Lance scans are skipped because their hoodieFileFormat != PARQUET and
Lance already
+ // supports read_blob() under DESCRIPTOR via byte-range references.
+ private def syncDescriptorMode(plan: LogicalPlan, forceContentMode:
Boolean): Unit = {
Review Comment:
🤖 nit: `syncDescriptorMode` reads like a concurrency-sync operation rather
than a policy-apply step. Something like `applyDescriptorModePolicy` or
`updateDescriptorModeForPlan` would make it clearer that this is deciding
whether to downgrade DESCRIPTOR→CONTENT based on query content, not
synchronizing threads.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]