danny0405 opened a new pull request, #18717: URL: https://github.com/apache/hudi/pull/18717
### Describe the issue this Pull Request addresses Flink 2.1 native `VARIANT` reads currently fail in Hudi's vectorized Parquet path because the Flink 2.1 `ParquetSplitReaderUtil` does not handle `LogicalTypeRoot.VARIANT`. Spark 4.0 writes unshredded Variant values to Parquet as a group with binary `value` and `metadata` children, but the reader previously fell through to the unsupported default branch. This PR adds native Flink 2.1 Variant read support, updates compatibility tests to assert Flink `BinaryVariant`, and migrates related CI/build profiles from older Flink profiles to Flink 2.1. ### Summary and Changelog #### Working tree: Add native Flink 2.1 Variant Parquet reader support - Added `VARIANT` handling in `hudi-flink2.1.x` `ParquetSplitReaderUtil`. - Allocates Variant vectors as `HeapRowColumnVector` with `HeapBytesVector` children in Flink's expected `value`, `metadata` order. - Builds a matching `RowColumnReader` using `BytesColumnReader`s for Spark's Parquet Variant `value` and `metadata` fields. - Adds validation for missing or non-binary Variant child fields with clear `IllegalArgumentException` messages. - Adds null/default vector handling for missing Variant columns during schema evolution. #### Working tree: Update Variant tests for native Flink Variant - Changed `ITTestVariantCrossEngineCompatibility` DDL from `ROW<metadata BYTES, value BYTES>` to native `VARIANT`. - Updated assertions to verify returned values as `BinaryVariant` via `getValue()` and `getMetadata()`. - Re-enabled Spark Variant cross-engine COW and MOR compatibility tests. - Updated `TestHoodieSchemaConverter` expectations to assert Flink `VariantType`. #### Working tree: Move Flink CI/build coverage to Flink 2.1 - Updated root Maven defaults from Flink 1.20 to Flink 2.1, including module, bundle version, Parquet version, Kafka connector version, and Hadoop compatibility artifact. - Switched the active-by-default Flink Maven profile from `flink1.20` to `flink2.1`. - Updated GitHub workflow Flink jobs and integration-test matrices from `flink1.20` to `flink2.1`. - Updated `azure-pipelines-20230430.yml` test profiles from `-Dflink1.18` to `-Dflink2.1`. ### Impact - **Functional impact**: Flink 2.1 can read Spark-written unshredded Parquet Variant columns from Hudi tables as native `BinaryVariant`. - **Maintainability**: Keeps Variant read logic localized to the Flink 2.1 Parquet reader and uses existing vectorized row/bytes reader abstractions. - **Extensibility**: Establishes the `value`, `metadata` backing-vector layout for future native Variant read paths and schema-evolution behavior. - **Risk**: Medium, because this touches Flink profile defaults, CI configuration, and the vectorized Parquet reader. Risk is mitigated by targeted Variant compatibility tests and schema converter coverage. - **Validation**: Passed `ITTestVariantCrossEngineCompatibility#testFlinkReadSparkVariantMORTableWithSpark`, full `ITTestVariantCrossEngineCompatibility`, and `TestHoodieSchemaConverter` under the Flink 2.1 profile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
