This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 71aa121035cf fix(flink): enforce Parquet VARIANT annotation in Flink
schema conversion for unshredded variant (#18539)
71aa121035cf is described below
commit 71aa121035cf912770f6ad4bf157a945ae3a8409
Author: Krishen <[email protected]>
AuthorDate: Mon May 18 21:26:41 2026 -0700
fix(flink): enforce Parquet VARIANT annotation in Flink schema conversion
for unshredded variant (#18539)
* feat(flink): write/read unshredded variant to Flink parquet file
writers/readers using Flink's Variant type
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
Co-authored-by: Cursor <[email protected]>
---
.../row/parquet/ParquetSchemaConverter.java | 31 ++++-
.../apache/hudi/util/HoodieSchemaConverter.java | 4 +
.../row/parquet/TestParquetSchemaConverter.java | 148 +++++++++++++++++++++
.../hudi/util/TestHoodieSchemaConverter.java | 34 ++++-
.../ITTestVariantCrossEngineCompatibility.java | 3 +-
.../org/apache/hudi/adapter/DataTypeAdapter.java | 20 ++-
.../org/apache/hudi/adapter/DataTypeAdapter.java | 20 ++-
.../org/apache/hudi/adapter/DataTypeAdapter.java | 20 ++-
.../org/apache/hudi/adapter/DataTypeAdapter.java | 20 ++-
.../org/apache/hudi/adapter/DataTypeAdapter.java | 20 ++-
.../org/apache/hudi/adapter/DataTypeAdapter.java | 35 +++++
11 files changed, 323 insertions(+), 32 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 668098314d48..dd19100f8ccd 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -49,6 +49,17 @@ import static
org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
/**
* Schema converter converts Parquet schema to and from Flink internal types.
*
+ * <p>On reads, this converter performs best-effort physical type mapping. It
detects the
+ * Parquet {@code VARIANT} annotation and will reject shredded variants. Blob
and Vector types
+ * cannot be distinguished from ordinary binary columns via Parquet schema
alone.
+ *
+ * <p>On writes, this converter maps Flink {@code VariantType} to the
canonical unshredded Parquet
+ * layout (group with binary metadata + value fields). The VARIANT logical
type annotation is
+ * resolved by {@link DataTypeAdapter#variantParquetAnnotation()} — on Flink
2.1+ with
+ * parquet-java 1.16.0+ the annotation is attached automatically; on pre-2.1
Flink or with
+ * parquet < 1.16.0 the write throws {@link UnsupportedOperationException}
because writing
+ * variant data without the annotation would produce files that no reader can
identify as variant.
+ *
* <p>Reference org.apache.flink.formats.parquet.utils.ParquetSchemaConverter
to support timestamp of INT64 8 bytes.
*/
@Slf4j
@@ -158,6 +169,9 @@ public class ParquetSchemaConverter {
convertToRowField(keyValueType.getLeft()).getType().copy(true),
convertToRowField(keyValueType.getRight()).getType()));
} else if (hasVariantAnnotation(logicalType)) {
+ // Fires for files written with parquet-java that carry the VARIANT
annotation.
+ // The reader infers the Flink RowType from the Parquet footer via
convertToRowType(),
+ // so this annotation detection is the primary mechanism for
recognizing Variant columns.
if (isShreddedVariant(groupType)) {
throw new UnsupportedOperationException(
"Shredded Variant is not supported in Flink. "
@@ -223,10 +237,25 @@ public class ParquetSchemaConverter {
/**
* Converts a Variant column to the canonical unshredded Parquet layout:
* a group with required binary {@code metadata} and required binary {@code
value}.
+ *
+ * <p>No shredded-variant guard is needed here: Flink 2.1's {@code
VariantType} is a single
+ * atomic {@code LogicalTypeRoot.VARIANT} with no shredding representation
(FLIP-521 scopes
+ * shredding out), so a shredded variant can never arrive as a Flink
LogicalType.
+ *
+ * <p>Delegates to {@link DataTypeAdapter#variantParquetAnnotation()} for
the VARIANT logical
+ * type annotation. On Flink < 2.1 this throws (variant writes are
unsupported). On Flink 2.1+
+ * with parquet-java < 1.16.0 this also throws, because writing variant data
without the
+ * annotation would produce files that no reader can identify as variant.
*/
private static Type convertVariantToParquetType(String name, Type.Repetition
repetition) {
- // TODO: add .as(LogicalTypeAnnotation.variantType()) once parquet-java is
bumped to 1.16.0
+ LogicalTypeAnnotation annotation =
DataTypeAdapter.variantParquetAnnotation()
+ .orElseThrow(() -> new UnsupportedOperationException(
+ "Cannot write Variant columns: parquet-java 1.16.0+ is required to
emit the VARIANT "
+ + "logical type annotation. Without the annotation, readers
cannot identify the "
+ + "column as Variant. Current parquet-java version does not
support "
+ + "LogicalTypeAnnotation.variantType()."));
return Types.buildGroup(repetition)
+ .as(annotation)
.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
.named(HoodieSchema.Variant.VARIANT_METADATA_FIELD))
.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
index c49142800b45..c413101dde46 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -70,6 +70,10 @@ public class HoodieSchemaConverter {
* <p>The "{rowName}." is used as the nested row type name prefix in order
to generate
* the right schema. Nested record types that only differ by type name are
still compatible.
*
+ * <p>On Flink 2.1+, {@code LogicalTypeRoot.VARIANT} is detected via string
comparison
+ * (to avoid compile-time dependency) and mapped to {@link
HoodieSchema#createVariant()}.
+ * Pre-2.1 Flink does not support Variant.
+ *
* @param logicalType Flink logical type
* @param rowName the record name
* @return HoodieSchema matching this logical type
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index 318f2034d175..403b84068496 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -18,6 +18,10 @@
package org.apache.hudi.io.storage.row.parquet;
+import org.apache.hudi.adapter.DataTypeAdapter;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
@@ -27,13 +31,19 @@ import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -41,6 +51,9 @@ import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for {@link ParquetSchemaConverter}.
@@ -216,4 +229,139 @@ public class TestParquetSchemaConverter {
+ "}\n";
assertThat(messageType.toString(), is(expected));
}
+
+ /**
+ * A Parquet group with metadata + value binary fields but NO VARIANT
annotation must be
+ * treated as a plain ROW. Only the Parquet {@code VARIANT} annotation
triggers variant
+ * detection in this converter; unannotated groups are never guessed as
variant.
+ */
+ @Test
+ void testVariantPhysicalLayoutTreatedAsRow() {
+ MessageType variantParquet = new MessageType(
+ "test",
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+ Type.Repetition.REQUIRED).named("id"),
+ Types.buildGroup(Type.Repetition.REQUIRED)
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+ Type.Repetition.REQUIRED).named("metadata"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+ Type.Repetition.REQUIRED).named("value"))
+ .named("data"));
+
+ RowType rowType = ParquetSchemaConverter.convertToRowType(variantParquet);
+ assertEquals(2, rowType.getFieldCount());
+ assertEquals("ROW", rowType.getTypeAt(1).getTypeRoot().name());
+ }
+
+ /**
+ * Unannotated group with metadata + value + typed_value (3 fields) is
treated as a generic
+ * ROW when no annotation or schema hint is present.
+ */
+ @Test
+ void testUnannotatedShreddedGroupTreatedAsRow() {
+ MessageType shreddedNoAnnotation = new MessageType(
+ "test",
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+ Type.Repetition.REQUIRED).named("id"),
+ Types.buildGroup(Type.Repetition.REQUIRED)
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+ Type.Repetition.REQUIRED).named("metadata"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+ Type.Repetition.REQUIRED).named("value"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+ Type.Repetition.OPTIONAL).named("typed_value"))
+ .named("data"));
+
+ RowType rowType =
ParquetSchemaConverter.convertToRowType(shreddedNoAnnotation);
+ assertEquals(2, rowType.getFieldCount());
+ assertEquals("ROW", rowType.getTypeAt(1).getTypeRoot().name());
+ }
+
+ /**
+ * On Flink 2.1+ with parquet 1.16.0+, converting a RowType containing a
Variant column to a
+ * Parquet MessageType should produce a group with the VARIANT annotation
and required binary
+ * {@code metadata} and {@code value} fields.
+ * On pre-2.1 Flink this test is skipped since VariantType does not exist.
+ * On parquet < 1.16.0 the write is expected to fail (annotation
unavailable).
+ */
+ @Test
+ void testVariantWritePathProducesCorrectLayout() {
+ LogicalType variantType;
+ try {
+ variantType = DataTypeAdapter.createVariantType().getLogicalType();
+ } catch (UnsupportedOperationException e) {
+ // Pre-2.1 Flink: VariantType doesn't exist, skip
+ return;
+ }
+
+ RowType rowType = RowType.of(
+ new LogicalType[]{new IntType(), variantType},
+ new String[]{"id", "data"});
+
+ if (!DataTypeAdapter.variantParquetAnnotation().isPresent()) {
+ // parquet < 1.16.0: write must fail because annotation is unavailable
+ UnsupportedOperationException ex =
org.junit.jupiter.api.Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> ParquetSchemaConverter.convertToParquetMessageType("test",
rowType));
+ assertTrue(ex.getMessage().contains("parquet-java 1.16.0+"),
+ "Error message should mention parquet version requirement");
+ return;
+ }
+
+ // parquet 1.16.0+: write succeeds with annotation
+ MessageType messageType =
ParquetSchemaConverter.convertToParquetMessageType("test", rowType);
+ assertEquals(2, messageType.getFieldCount());
+
+ Type variantField = messageType.getType("data");
+ assertTrue(variantField instanceof GroupType, "Variant column should be a
Parquet group");
+ GroupType variantGroup = (GroupType) variantField;
+ assertEquals(2, variantGroup.getFieldCount());
+ assertEquals(HoodieSchema.Variant.VARIANT_METADATA_FIELD,
variantGroup.getType(0).getName());
+ assertEquals(HoodieSchema.Variant.VARIANT_VALUE_FIELD,
variantGroup.getType(1).getName());
+ assertTrue(variantGroup.getType(0).isPrimitive());
+ assertTrue(variantGroup.getType(1).isPrimitive());
+ assertEquals(PrimitiveType.PrimitiveTypeName.BINARY,
+ variantGroup.getType(0).asPrimitiveType().getPrimitiveTypeName());
+ assertEquals(PrimitiveType.PrimitiveTypeName.BINARY,
+ variantGroup.getType(1).asPrimitiveType().getPrimitiveTypeName());
+ assertNotNull(variantGroup.getLogicalTypeAnnotation(),
+ "Variant group must carry the VARIANT annotation");
+ }
+
+ /**
+ * Verifies that writing a Variant column fails with a clear error when
parquet-java on the
+ * classpath does not support the VARIANT annotation (< 1.16.0). On pre-2.1
Flink the adapter
+ * throws directly; on Flink 2.1+ with parquet < 1.16.0 the write path
throws.
+ */
+ @Test
+ void testVariantWriteFailsWithoutAnnotation() {
+ Option<LogicalTypeAnnotation> annotationOpt;
+ try {
+ annotationOpt = DataTypeAdapter.variantParquetAnnotation();
+ } catch (UnsupportedOperationException e) {
+ // Pre-2.1 Flink: expected to throw from the adapter
+ assertTrue(e.getMessage().contains("VARIANT type is only supported in
Flink 2.1+"));
+ return;
+ }
+
+ if (annotationOpt.isPresent()) {
+ // parquet 1.16.0+: annotation is available, write succeeds — nothing to
test here
+ return;
+ }
+
+ // Flink 2.1 + parquet < 1.16.0: annotation is null, write must fail
+ LogicalType variantType =
DataTypeAdapter.createVariantType().getLogicalType();
+ RowType rowType = RowType.of(
+ new LogicalType[]{new IntType(), variantType},
+ new String[]{"id", "data"});
+
+ UnsupportedOperationException ex =
org.junit.jupiter.api.Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> ParquetSchemaConverter.convertToParquetMessageType("test",
rowType));
+ assertTrue(ex.getMessage().contains("parquet-java 1.16.0+"),
+ "Error message should mention the parquet version requirement");
+ assertTrue(ex.getMessage().contains("VARIANT"),
+ "Error message should mention VARIANT");
+ }
+
}
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index 9a4f920b859e..9f6ad8f53bf7 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -695,19 +695,16 @@ public class TestHoodieSchemaConverter {
@Test
public void testVariantTypeConversion() {
- // Test direct Variant conversion
HoodieSchema variantSchema = HoodieSchema.createVariant();
DataType dataType = HoodieSchemaConverter.convertToDataType(variantSchema);
assertNotNull(dataType);
- // Verify it's a Variant
assertThat("the return type should be variant",
dataType.getLogicalType().asSummaryString(), is("VARIANT NOT NULL"));
}
@Test
public void testVariantInRecordConversion() {
- // Test Variant field within a record
HoodieSchema recordWithVariant = HoodieSchema.createRecord(
"test_record",
null,
@@ -722,11 +719,40 @@ public class TestHoodieSchemaConverter {
assertEquals(2, result.getFieldCount());
assertEquals("data", result.getFieldNames().get(1));
- // Verify variant field
assertThat("the return type should be variant",
result.getTypeAt(1).asSummaryString(), is("VARIANT NOT NULL"));
}
+ @Test
+ public void testVariantInArrayConversion() {
+ HoodieSchema arrayOfVariant =
HoodieSchema.createArray(HoodieSchema.createVariant());
+ DataType dataType =
HoodieSchemaConverter.convertToDataType(arrayOfVariant);
+ assertNotNull(dataType);
+ assertInstanceOf(ArrayType.class, dataType.getLogicalType());
+ LogicalType elementType = ((ArrayType)
dataType.getLogicalType()).getElementType();
+ assertEquals("VARIANT", elementType.getTypeRoot().name());
+ }
+
+ @Test
+ public void testVariantInMapConversion() {
+ HoodieSchema mapOfVariant =
HoodieSchema.createMap(HoodieSchema.createVariant());
+ DataType dataType = HoodieSchemaConverter.convertToDataType(mapOfVariant);
+ assertNotNull(dataType);
+ assertInstanceOf(MapType.class, dataType.getLogicalType());
+ LogicalType valueType = ((MapType)
dataType.getLogicalType()).getValueType();
+ assertEquals("VARIANT", valueType.getTypeRoot().name());
+ }
+
+ @Test
+ public void testShreddedVariantConversionThrows() {
+ HoodieSchema.Variant shredded = HoodieSchema.createVariantShredded(
+ HoodieSchema.create(HoodieSchemaType.STRING));
+ UnsupportedOperationException ex = assertThrows(
+ UnsupportedOperationException.class,
+ () -> HoodieSchemaConverter.convertToDataType(shredded));
+ assertTrue(ex.getMessage().contains("Shredded Variant is not yet supported
in Flink"));
+ }
+
@Test
public void testBlobStructureValidation() {
// Positive case: Create ROW matching BLOB structure
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
index de6e3835d120..e7f9ed3766f1 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
@@ -41,6 +41,7 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+
/**
* Integration test for cross-engine compatibility - verifying that Flink can
read Variant tables written by Spark 4.0.
*/
@@ -56,7 +57,6 @@ public class ITTestVariantCrossEngineCompatibility {
private void verifyFlinkCanReadSparkVariantTable(String tablePath, String
tableType, String testDescription) throws Exception {
TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
- // Create a Hudi table pointing to the Spark-written data
String createTableDdl = String.format(
"CREATE TABLE variant_table ("
+ " id INT,"
@@ -73,7 +73,6 @@ public class ITTestVariantCrossEngineCompatibility {
tableEnv.executeSql(createTableDdl);
- // Query the table to verify Flink can read the data
TableResult result = tableEnv.executeSql("SELECT id, name, v, ts FROM
variant_table ORDER BY id");
List<Row> rows = CollectionUtil.iteratorToList(result.collect());
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 79b9e254dd61..e8e31b341a18 100644
---
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -22,17 +22,27 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
/**
* Adapter utils to provide {@code DataType} utilities.
*/
public class DataTypeAdapter {
+ private static final String VARIANT_UNSUPPORTED_MSG =
+ "VARIANT type is only supported in Flink 2.1+. "
+ + "Please upgrade your Flink version to use Variant columns.";
+
+ public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
+ }
+
public static Variant getVariant(RowData rowData, int pos) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static Object createVariant(byte[] value, byte[] metadata) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static boolean isVariantType(LogicalType logicalType) {
@@ -40,14 +50,14 @@ public class DataTypeAdapter {
}
public static DataType createVariantType() {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static byte[] getVariantMetadata(Object obj) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static byte[] getVariantValue(Object obj) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 79b9e254dd61..e8e31b341a18 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -22,17 +22,27 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
/**
* Adapter utils to provide {@code DataType} utilities.
*/
public class DataTypeAdapter {
+ private static final String VARIANT_UNSUPPORTED_MSG =
+ "VARIANT type is only supported in Flink 2.1+. "
+ + "Please upgrade your Flink version to use Variant columns.";
+
+ public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
+ }
+
public static Variant getVariant(RowData rowData, int pos) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static Object createVariant(byte[] value, byte[] metadata) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static boolean isVariantType(LogicalType logicalType) {
@@ -40,14 +50,14 @@ public class DataTypeAdapter {
}
public static DataType createVariantType() {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static byte[] getVariantMetadata(Object obj) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static byte[] getVariantValue(Object obj) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 79b9e254dd61..e8e31b341a18 100644
---
a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++
b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -22,17 +22,27 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
/**
* Adapter utils to provide {@code DataType} utilities.
*/
public class DataTypeAdapter {
+ private static final String VARIANT_UNSUPPORTED_MSG =
+ "VARIANT type is only supported in Flink 2.1+. "
+ + "Please upgrade your Flink version to use Variant columns.";
+
+ public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
+ }
+
public static Variant getVariant(RowData rowData, int pos) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static Object createVariant(byte[] value, byte[] metadata) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static boolean isVariantType(LogicalType logicalType) {
@@ -40,14 +50,14 @@ public class DataTypeAdapter {
}
public static DataType createVariantType() {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static byte[] getVariantMetadata(Object obj) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static byte[] getVariantValue(Object obj) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 79b9e254dd61..e8e31b341a18 100644
---
a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++
b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -22,17 +22,27 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
/**
* Adapter utils to provide {@code DataType} utilities.
*/
public class DataTypeAdapter {
+ private static final String VARIANT_UNSUPPORTED_MSG =
+ "VARIANT type is only supported in Flink 2.1+. "
+ + "Please upgrade your Flink version to use Variant columns.";
+
+ public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
+ }
+
public static Variant getVariant(RowData rowData, int pos) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static Object createVariant(byte[] value, byte[] metadata) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static boolean isVariantType(LogicalType logicalType) {
@@ -40,14 +50,14 @@ public class DataTypeAdapter {
}
public static DataType createVariantType() {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static byte[] getVariantMetadata(Object obj) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static byte[] getVariantValue(Object obj) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 566ca723648a..18db48f0fed1 100644
---
a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++
b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -22,17 +22,27 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
/**
* Adapter utils to provide {@code DataType} utilities.
*/
public class DataTypeAdapter {
+ private static final String VARIANT_UNSUPPORTED_MSG =
+ "VARIANT type is only supported in Flink 2.1+. "
+ + "Please upgrade your Flink version to use Variant columns.";
+
+ public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
+ }
+
public static Variant getVariant(RowData rowData, int pos) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static Object createVariant(byte[] value, byte[] metadata) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static boolean isVariantType(LogicalType logicalType) {
@@ -40,14 +50,14 @@ public class DataTypeAdapter {
}
public static DataType createVariantType() {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static byte[] getVariantMetadata(Object obj) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
public static byte[] getVariantValue(Object obj) {
- throw new UnsupportedOperationException("Variant is not supported yet.");
+ throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
}
}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 6ae4c4a6baba..77d6f708ece3 100644
---
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -25,11 +25,46 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.variant.BinaryVariant;
import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+
+import java.lang.reflect.Method;
/**
* Adapter utils to provide {@code DataType} utilities.
*/
public class DataTypeAdapter {
+
+ /**
+ * The Parquet Variant binary format specification version passed to
+ * {@code LogicalTypeAnnotation.variantType(byte)}. Version 1 is the initial
spec
+ * defined by the Parquet Variant proposal (parquet-format 2.11.0 /
parquet-java 1.16.0).
+ */
+ private static final byte VARIANT_SPEC_VERSION = 1;
+
+ /**
+ * Cached VARIANT annotation resolved via reflection. Empty if parquet-java
+ * on the classpath predates {@code LogicalTypeAnnotation.variantType()} (<
1.16.0).
+ */
+ private static final Option<LogicalTypeAnnotation> VARIANT_ANNOTATION =
resolveVariantAnnotation();
+
+ private static Option<LogicalTypeAnnotation> resolveVariantAnnotation() {
+ try {
+ Method factory = LogicalTypeAnnotation.class.getMethod("variantType",
byte.class);
+ return Option.of((LogicalTypeAnnotation) factory.invoke(null,
VARIANT_SPEC_VERSION));
+ } catch (Exception e) {
+ return Option.empty();
+ }
+ }
+
+ /**
+ * Returns the Parquet VARIANT {@link LogicalTypeAnnotation} if parquet-java
1.16.0+ is on the
+ * classpath, or empty if the annotation class is unavailable.
+ */
+ public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+ return VARIANT_ANNOTATION;
+ }
+
public static Variant getVariant(RowData rowData, int pos) {
return rowData.getVariant(pos);
}