hudi-agent commented on code in PR #18539:
URL: https://github.com/apache/hudi/pull/18539#discussion_r3239609249
##########
hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java:
##########
@@ -25,11 +25,38 @@
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.variant.BinaryVariant;
import org.apache.flink.types.variant.Variant;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+
+import java.lang.reflect.Method;
/**
* Adapter utils to provide {@code DataType} utilities.
*/
public class DataTypeAdapter {
+
+ /**
+ * Cached VARIANT annotation resolved via reflection, or {@code null} if
parquet-java
+ * on the classpath predates {@code LogicalTypeAnnotation.variantType()} (<
1.16.0).
+ */
+ private static final LogicalTypeAnnotation VARIANT_ANNOTATION =
resolveVariantAnnotation();
+
+ private static LogicalTypeAnnotation resolveVariantAnnotation() {
+ try {
+ Method factory = LogicalTypeAnnotation.class.getMethod("variantType",
byte.class);
+ return (LogicalTypeAnnotation) factory.invoke(null, (byte) 1);
Review Comment:
🤖 nit: the bare `(byte) 1` is opaque — could you pull it into a named
constant (e.g. `VARIANT_SPEC_VERSION`) or add a short comment explaining it's
the variant spec version byte expected by `variantType(byte)`? Future readers
shouldn't have to dig into parquet-java to know what `1` means.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java:
##########
@@ -216,4 +228,117 @@ void testConvertTimestampTypes() {
+ "}\n";
assertThat(messageType.toString(), is(expected));
}
+
+ /**
+ * A Parquet group with metadata + value binary fields but NO VARIANT
annotation must be
+ * treated as a plain ROW. Only the Parquet {@code VARIANT} annotation
triggers variant
+ * detection in this converter; unannotated groups are never guessed as
variant.
+ */
+ @Test
+ void testVariantPhysicalLayoutTreatedAsRow() {
+ MessageType variantParquet = new MessageType(
+ "test",
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+ Type.Repetition.REQUIRED).named("id"),
+ Types.buildGroup(Type.Repetition.REQUIRED)
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+ Type.Repetition.REQUIRED).named("metadata"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+ Type.Repetition.REQUIRED).named("value"))
+ .named("data"));
+
+ RowType rowType = ParquetSchemaConverter.convertToRowType(variantParquet);
+ assertEquals(2, rowType.getFieldCount());
+ assertEquals("ROW", rowType.getTypeAt(1).getTypeRoot().name());
+ }
+
+ /**
+ * Unannotated group with metadata + value + typed_value (3 fields) is
treated as a generic
+ * ROW when no annotation or schema hint is present.
+ */
+ @Test
+ void testUnannotatedShreddedGroupTreatedAsRow() {
+ MessageType shreddedNoAnnotation = new MessageType(
+ "test",
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+ Type.Repetition.REQUIRED).named("id"),
+ Types.buildGroup(Type.Repetition.REQUIRED)
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+ Type.Repetition.REQUIRED).named("metadata"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+ Type.Repetition.REQUIRED).named("value"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+ Type.Repetition.OPTIONAL).named("typed_value"))
+ .named("data"));
+
+ RowType rowType =
ParquetSchemaConverter.convertToRowType(shreddedNoAnnotation);
+ assertEquals(2, rowType.getFieldCount());
+ assertEquals("ROW", rowType.getTypeAt(1).getTypeRoot().name());
+ }
+
+ /**
+ * On Flink 2.1+, converting a RowType containing a Variant column to a
Parquet MessageType
+ * should produce a group with required binary {@code metadata} and {@code
value} fields.
+ * On pre-2.1 Flink this test is skipped since VariantType does not exist.
+ */
+ @Test
+ void testVariantWritePathProducesCorrectLayout() {
+ LogicalType variantType;
+ try {
+ variantType = DataTypeAdapter.createVariantType().getLogicalType();
+ } catch (UnsupportedOperationException e) {
+ // Pre-2.1 Flink: VariantType doesn't exist, skip
+ return;
+ }
+
+ RowType rowType = RowType.of(
+ new LogicalType[]{new IntType(), variantType},
+ new String[]{"id", "data"});
+
+ MessageType messageType =
ParquetSchemaConverter.convertToParquetMessageType("test", rowType);
+ assertEquals(2, messageType.getFieldCount());
+
+ Type variantField = messageType.getType("data");
+ assertTrue(variantField instanceof GroupType, "Variant column should be a
Parquet group");
+ GroupType variantGroup = (GroupType) variantField;
+ assertEquals(2, variantGroup.getFieldCount());
+ assertEquals(HoodieSchema.Variant.VARIANT_METADATA_FIELD,
variantGroup.getType(0).getName());
+ assertEquals(HoodieSchema.Variant.VARIANT_VALUE_FIELD,
variantGroup.getType(1).getName());
+ assertTrue(variantGroup.getType(0).isPrimitive());
+ assertTrue(variantGroup.getType(1).isPrimitive());
+ assertEquals(PrimitiveType.PrimitiveTypeName.BINARY,
+ variantGroup.getType(0).asPrimitiveType().getPrimitiveTypeName());
+ assertEquals(PrimitiveType.PrimitiveTypeName.BINARY,
+ variantGroup.getType(1).asPrimitiveType().getPrimitiveTypeName());
+
+ // On parquet 1.15.2, annotation is null (variantType() doesn't exist).
+ // On parquet 1.16.0+, annotation should be present.
+ LogicalTypeAnnotation annotation = variantGroup.getLogicalTypeAnnotation();
+ LogicalTypeAnnotation expectedAnnotation =
DataTypeAdapter.variantParquetAnnotation();
+ assertEquals(expectedAnnotation, annotation,
+ "Variant group annotation should match
DataTypeAdapter.variantParquetAnnotation()");
+ }
+
+ /**
+ * Verifies that {@link DataTypeAdapter#variantParquetAnnotation()}
gracefully returns null
+ * when parquet-java on the classpath does not have the {@code
variantType()} factory
+ * (i.e., parquet < 1.16.0). This test will need updating when parquet is
bumped.
+ */
+ @Test
+ void testVariantAnnotationReflectionFallback() {
+ LogicalTypeAnnotation annotation;
+ try {
+ annotation = DataTypeAdapter.variantParquetAnnotation();
+ } catch (UnsupportedOperationException e) {
+ // Pre-2.1 Flink: expected to throw, not return null
+ assertTrue(e.getMessage().contains("VARIANT type is only supported in
Flink 2.1+"));
+ return;
+ }
+ // On Flink 2.1 with parquet 1.15.2: should be null (reflection fallback)
+ // On Flink 2.1 with parquet 1.16.0+: should be non-null
+ assertNotNull(annotation == null ? "null" : annotation,
Review Comment:
🤖 nit: this `assertNotNull(annotation == null ? "null" : annotation, ...)`
always passes — when `annotation` is null we pass the literal string "null",
which is non-null. If the intent is "either null or a
VariantLogicalTypeAnnotation is acceptable," it might be clearer to drop the
assertion entirely (or assert the type when non-null) so the test name actually
reflects what's verified.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]