kbuci opened a new issue, #18711:
URL: https://github.com/apache/hudi/issues/18711
## Problem Statement
Flink's type conversion process currently cannot do a 1:1 inference of
Hudi's unstructured logical types (**Variant**, **Blob**, **Vector**) to the
appropriate Flink data type from Parquet schema alone. The Parquet physical
layout for these types is ambiguous without additional context:
| Hudi Type | Parquet Physical Layout | Ambiguity |
|-----------|------------------------|-----------|
| **Variant** | `GROUP { required binary metadata; required binary value; }`
| Indistinguishable from a user struct `ROW<metadata BYTES, value BYTES>` |
| **Vector** | `FIXED_LEN_BYTE_ARRAY(N)` | Indistinguishable from any
fixed-length binary field |
| **Blob** | Nested group with canonical fields (`type`, `data`,
`reference`) | Indistinguishable from a user struct with those field names |
This is because Hudi compiles against **parquet-java 1.13.1** by default
([pom.xml L118](https://github.com/apache/hudi/blob/master/pom.xml#L118)),
which predates `VariantLogicalTypeAnnotation` (introduced in parquet-java
1.15.2+). There is no standard Parquet `LogicalTypeAnnotation` for Blob or
Vector at all.
### How does the Spark path handle this today?
On the **Spark side**, Hudi uses other mechanisms rather than Parquet schema
annotations:
- **Vector**: Writes `hoodie.vector.columns` key-value metadata in the
Parquet file footer
([HoodieRowParquetWriteSupport.java](https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java)),
plus `hudi_type` metadata on Spark `StructField`. Detection on read uses the
requested `HoodieSchema` or Spark field metadata — never the Parquet column
schema alone
([VectorConversionUtils.java](https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java)).
- **Blob**: Uses Avro `logicalType: blob` and Spark `hudi_type: BLOB`
metadata. No dedicated Parquet footer key.
- **Variant**: Currently no footer metadata. The Parquet
`VariantLogicalTypeAnnotation` is only available at runtime if parquet-java ≥
1.15.2 is on the classpath. The existing `hasVariantAnnotation()` check uses
**class-name string matching** to avoid compile-time dependency
([ParquetSchemaConverter.java
L430](https://github.com/apache/hudi/blob/master/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java#L430)).
### Why is this harder for Flink than Spark?
Spark's read path already has `HoodieSchema` available at the point where
Parquet → Spark type conversion happens. Flink's
`ParquetSchemaConverter.convertToRowType(MessageType)` was originally a
**standalone** conversion that only looked at the Parquet `MessageType` — no
HUDI avro schema information.
---
## Proposed Options
### Option A: Require `HoodieSchema` in all Flink Parquet-to-type conversions
**Approach**: Make `HoodieSchema` a **required** parameter (not
optional/nullable) in
[`ParquetSchemaConverter.convertToRowType()`](https://github.com/apache/hudi/blob/master/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java#L77)
and thread it through all Flink read entry points
([`HoodieRowDataParquetReader`](https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java),
[`FlinkRowDataReaderContext`](https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java),
`ClusteringOperator`, `CopyOnWriteInputFormat`, `MergeOnReadInputFormat`,
etc.).
**Pros**:
- Works immediately with current parquet-java 1.13.1
- Handles all three types (Variant, Blob, Vector) uniformly
- Can read unstructured data written by Spark (current HUDI builds) today
- Consistent with Spark's approach (Spark always has `HoodieSchema`
available at conversion time)
**Cons**:
- Touches many Flink read path entry points — potentially breaking internal
APIs
- Every new Flink reader/operator that does Parquet→type conversion must
remember to supply the schema
- Does this break any public connector APIs? (Need to verify —
`HoodieTableFactory` / `DynamicTableSource` wiring)
**Open question**: Does this actually break any *public-facing* API (`CREATE
TABLE` DDL options, `FlinkOptions`, etc.), or only internal plumbing?
---
### Option B: Parquet file footer metadata (like `hoodie.vector.columns`)
**Approach**: On the write side (both Spark and Flink), emit footer
key-value metadata listing which columns are Variant/Blob (e.g.,
`hoodie.variant.columns`, `hoodie.blob.columns`). On the Flink read side, read
footer metadata from `ParquetMetadata.getFileMetaData().getKeyValueMetaData()`
and pass it into `ParquetSchemaConverter`.
This follows the existing precedent of
[`hoodie.vector.columns`](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java#L220)
which is already written by
[`HoodieRowParquetWriteSupport`](https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java)
and
[`HoodieAvroWriteSupport`](https://github.com/apache/hudi/blob/master/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java).
**Pros**:
- Self-describing: each Parquet file carries the information needed to infer
types
- No dependency on having the table-level `HoodieSchema` at read time
- Follows existing Hudi conventions for vectors
- Simpler integration — reader just needs to read footer KV metadata
(already accessible via
[`ParquetUtils.readFileMetadataOnly()`](https://github.com/apache/hudi/blob/master/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java#L245))
**Cons**:
- Still requires threading *something* (the metadata map) into
`ParquetSchemaConverter` — similar plumbing to Option A, just a smaller object
- **Backward compatibility**: Files written by older Hudi builds (before
this change) won't have the footer metadata. Those files can only be read
correctly with Option A (HoodieSchema threading) as a fallback
- Write-side changes needed in both Spark (`HoodieRowParquetWriteSupport`)
and Flink (`RowDataParquetWriteSupport`) writers
- Doesn't help for files written by external systems (e.g., Spark 4.0 native
Variant writes without Hudi)
---
### Can we use custom parquet LogicalType annotation via reflection for
Blob/Vector (similar to how Variant has an "official" one in parquet) ?
Parquet's `LogicalType` is a Thrift union with a fixed, numbered set of
fields defined in the [parquet-format
spec](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift).
Variant was accepted into this spec as field `17: VariantType`, so
parquet-java 1.15.2+ can natively write and read it. There doesn't seem to be a
extension mechanism to register custom entries at runtime — the field numbers
are baked into compiled Thrift code. Since Blob and Vector have no equivalent
entries in the Parquet format spec (and likely never will, since they're
Hudi-specific concepts), there is no way to attach a self-describing annotation
for them on a Parquet schema node, regardless of which parquet-java version you
use. Adding them would require an upstream RFC to the parquet-format project
itself.
---
## Recommendation / Discussion Questions
1. **Should we combine approaches?** E.g., Option C (annotation when
available) + Option B (footer metadata as fallback) + Option A (HoodieSchema
threading as last resort for legacy files)?
2. **How important is reading legacy files?** If we're okay with "files
written by HUDI builds before this change cannot have their unstructured types
inferred by Flink without the table schema," then Option B alone might suffice
going forward.
3. **Does Option A break any public Flink connector API?** The
`HoodieTableFactory` / `DynamicTableSource` path likely already has access to
the table schema from the catalog. But what about raw `ParquetSchemaConverter`
usage in user code?
4. **Should we align Variant footer metadata with the existing vector
pattern?** I.e., add `hoodie.variant.columns` and `hoodie.blob.columns` footer
keys alongside the existing `hoodie.vector.columns`.
---
## Related PRs / Issues
- [#18539](https://github.com/apache/hudi/pull/18539) — Flink Variant
read/write (threads HoodieSchema, Option A approach)
- [#18702](https://github.com/apache/hudi/pull/18702) — DataTypeAdapter for
Variant (eliminates reflection for type access)
- [#18506](https://github.com/apache/hudi/issues/18506) — Flink Vector type
support
- [FLIP-521](https://cwiki.apache.org/confluence/display/FLINK/FLIP-521) —
Flink native Variant type (2.1+)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]