kbuci opened a new issue, #18711:
URL: https://github.com/apache/hudi/issues/18711

   ## Problem Statement
   
   Flink's type conversion process currently cannot do a 1:1 inference of 
Hudi's unstructured logical types (**Variant**, **Blob**, **Vector**) to the 
appropriate Flink data type from Parquet schema alone. The Parquet physical 
layout for these types is ambiguous without additional context:
   
   | Hudi Type | Parquet Physical Layout | Ambiguity |
   |-----------|------------------------|-----------|
   | **Variant** | `GROUP { required binary metadata; required binary value; }` 
| Indistinguishable from a user struct `ROW<metadata BYTES, value BYTES>` |
   | **Vector** | `FIXED_LEN_BYTE_ARRAY(N)` | Indistinguishable from any 
fixed-length binary field |
   | **Blob** | Nested group with canonical fields (`type`, `data`, 
`reference`) | Indistinguishable from a user struct with those field names |
   
   This is because Hudi compiles against **parquet-java 1.13.1** by default 
([pom.xml L118](https://github.com/apache/hudi/blob/master/pom.xml#L118)), 
which predates `VariantLogicalTypeAnnotation` (introduced in parquet-java 
1.15.2+). There is no standard Parquet `LogicalTypeAnnotation` for Blob or 
Vector at all.
   
   ### How does the Spark path handle this today?
   
   On the **Spark side**, Hudi uses other mechanisms rather than Parquet schema 
annotations:
   
   - **Vector**: Writes `hoodie.vector.columns` key-value metadata in the 
Parquet file footer 
([HoodieRowParquetWriteSupport.java](https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java)),
 plus `hudi_type` metadata on Spark `StructField`. Detection on read uses the 
requested `HoodieSchema` or Spark field metadata — never the Parquet column 
schema alone 
([VectorConversionUtils.java](https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java)).
   - **Blob**: Uses Avro `logicalType: blob` and Spark `hudi_type: BLOB` 
metadata. No dedicated Parquet footer key.
   - **Variant**: Currently no footer metadata. The Parquet 
`VariantLogicalTypeAnnotation` is only available at runtime if parquet-java ≥ 
1.15.2 is on the classpath. The existing `hasVariantAnnotation()` check uses 
**class-name string matching** to avoid compile-time dependency 
([ParquetSchemaConverter.java 
L430](https://github.com/apache/hudi/blob/master/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java#L430)).
   
   ### Why is this harder for Flink than Spark?
   
   Spark's read path already has `HoodieSchema` available at the point where 
Parquet → Spark type conversion happens. Flink's 
`ParquetSchemaConverter.convertToRowType(MessageType)` was originally a 
**standalone** conversion that only looked at the Parquet `MessageType` — no 
HUDI avro schema information.
   
   ---
   
   ## Proposed Options
   
   ### Option A: Require `HoodieSchema` in all Flink Parquet-to-type conversions
   
   **Approach**: Make `HoodieSchema` a **required** parameter (not 
optional/nullable) in 
[`ParquetSchemaConverter.convertToRowType()`](https://github.com/apache/hudi/blob/master/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java#L77)
 and thread it through all Flink read entry points 
([`HoodieRowDataParquetReader`](https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java),
 
[`FlinkRowDataReaderContext`](https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java),
 `ClusteringOperator`, `CopyOnWriteInputFormat`, `MergeOnReadInputFormat`, 
etc.).
   
   **Pros**:
   - Works immediately with current parquet-java 1.13.1
   - Handles all three types (Variant, Blob, Vector) uniformly
   - Can read unstructured data written by Spark (current HUDI builds) today
   - Consistent with Spark's approach (Spark always has `HoodieSchema` 
available at conversion time)
   
   **Cons**:
   - Touches many Flink read path entry points — potentially breaking internal 
APIs
   - Every new Flink reader/operator that does Parquet→type conversion must 
remember to supply the schema
   - Does this break any public connector APIs? (Need to verify — 
`HoodieTableFactory` / `DynamicTableSource` wiring)
   
   **Open question**: Does this actually break any *public-facing* API (`CREATE 
TABLE` DDL options, `FlinkOptions`, etc.), or only internal plumbing?
   
   ---
   
   ### Option B: Parquet file footer metadata (like `hoodie.vector.columns`)
   
   **Approach**: On the write side (both Spark and Flink), emit footer 
key-value metadata listing which columns are Variant/Blob (e.g., 
`hoodie.variant.columns`, `hoodie.blob.columns`). On the Flink read side, read 
footer metadata from `ParquetMetadata.getFileMetaData().getKeyValueMetaData()` 
and pass it into `ParquetSchemaConverter`.
   
   This follows the existing precedent of 
[`hoodie.vector.columns`](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java#L220)
 which is already written by 
[`HoodieRowParquetWriteSupport`](https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java)
 and 
[`HoodieAvroWriteSupport`](https://github.com/apache/hudi/blob/master/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java).
   
   **Pros**:
   - Self-describing: each Parquet file carries the information needed to infer 
types
   - No dependency on having the table-level `HoodieSchema` at read time
   - Follows existing Hudi conventions for vectors
   - Simpler integration — reader just needs to read footer KV metadata 
(already accessible via 
[`ParquetUtils.readFileMetadataOnly()`](https://github.com/apache/hudi/blob/master/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java#L245))
   
   **Cons**:
   - Still requires threading *something* (the metadata map) into 
`ParquetSchemaConverter` — similar plumbing to Option A, just a smaller object
   - **Backward compatibility**: Files written by older Hudi builds (before 
this change) won't have the footer metadata. Those files can only be read 
correctly with Option A (HoodieSchema threading) as a fallback
   - Write-side changes needed in both Spark (`HoodieRowParquetWriteSupport`) 
and Flink (`RowDataParquetWriteSupport`) writers
   - Doesn't help for files written by external systems (e.g., Spark 4.0 native 
Variant writes without Hudi)
   
   ---
   
   ### Can we use custom parquet LogicalType annotation via reflection for 
Blob/Vector (similar to how Variant has an "official" one in parquet) ?
   
   Parquet's `LogicalType` is a Thrift union with a fixed, numbered set of 
fields defined in the [parquet-format 
spec](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift).
 Variant was accepted into this spec as field `17: VariantType`, so 
parquet-java 1.15.2+ can natively write and read it. There doesn't seem to be a 
extension mechanism to register custom entries at runtime — the field numbers 
are baked into compiled Thrift code. Since Blob and Vector have no equivalent 
entries in the Parquet format spec (and likely never will, since they're 
Hudi-specific concepts), there is no way to attach a self-describing annotation 
for them on a Parquet schema node, regardless of which parquet-java version you 
use. Adding them would require an upstream RFC to the parquet-format project 
itself.
   
   ---
   
   ## Recommendation / Discussion Questions
   
   1. **Should we combine approaches?** E.g., Option C (annotation when 
available) + Option B (footer metadata as fallback) + Option A (HoodieSchema 
threading as last resort for legacy files)?
   
   2. **How important is reading legacy files?** If we're okay with "files 
written by HUDI builds before this change cannot have their unstructured types 
inferred by Flink without the table schema," then Option B alone might suffice 
going forward.
   
   3. **Does Option A break any public Flink connector API?** The 
`HoodieTableFactory` / `DynamicTableSource` path likely already has access to 
the table schema from the catalog. But what about raw `ParquetSchemaConverter` 
usage in user code?
   
   4. **Should we align Variant footer metadata with the existing vector 
pattern?** I.e., add `hoodie.variant.columns` and `hoodie.blob.columns` footer 
keys alongside the existing `hoodie.vector.columns`.
   
   ---
   
   ## Related PRs / Issues
   
   - [#18539](https://github.com/apache/hudi/pull/18539) — Flink Variant 
read/write (threads HoodieSchema, Option A approach)
   - [#18702](https://github.com/apache/hudi/pull/18702) — DataTypeAdapter for 
Variant (eliminates reflection for type access)
   - [#18506](https://github.com/apache/hudi/issues/18506) — Flink Vector type 
support
   - [FLIP-521](https://cwiki.apache.org/confluence/display/FLINK/FLIP-521) — 
Flink native Variant type (2.1+)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to