hudi-agent commented on code in PR #18539:
URL: https://github.com/apache/hudi/pull/18539#discussion_r3238741771


##########
hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java:
##########
@@ -25,11 +25,37 @@
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.types.variant.BinaryVariant;
 import org.apache.flink.types.variant.Variant;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+
+import java.lang.reflect.Method;
 
 /**
  * Adapter utils to provide {@code DataType} utilities.
  */
 public class DataTypeAdapter {
+
+  /**
+   * Cached VARIANT annotation resolved via reflection, or {@code null} if 
parquet-java
+   * on the classpath predates {@code LogicalTypeAnnotation.variantType()} (< 
1.16.0).
+   */
+  private static final LogicalTypeAnnotation VARIANT_ANNOTATION = 
resolveVariantAnnotation();
+
+  private static LogicalTypeAnnotation resolveVariantAnnotation() {
+    try {
+      Method factory = LogicalTypeAnnotation.class.getMethod("variantType", 
byte.class);
+      return (LogicalTypeAnnotation) factory.invoke(null, (byte) 1);

Review Comment:
   🤖 nit: the `(byte) 1` passed to `variantType()` is a magic number whose 
meaning (shredding version) isn't obvious from the call site — could you 
extract it as `private static final byte VARIANT_SHREDDING_VERSION = 1;` or at 
least drop a short inline comment explaining what it represents?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java:
##########
@@ -216,4 +228,117 @@ void testConvertTimestampTypes() {
         + "}\n";
     assertThat(messageType.toString(), is(expected));
   }
+
+  /**
+   * A Parquet group with metadata + value binary fields but NO VARIANT 
annotation must be
+   * treated as a plain ROW. Only the Parquet {@code VARIANT} annotation 
triggers variant
+   * detection in this converter; unannotated groups are never guessed as 
variant.
+   */
+  @Test
+  void testVariantPhysicalLayoutTreatedAsRow() {
+    MessageType variantParquet = new MessageType(
+        "test",
+        Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+            Type.Repetition.REQUIRED).named("id"),
+        Types.buildGroup(Type.Repetition.REQUIRED)
+            .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+                Type.Repetition.REQUIRED).named("metadata"))
+            .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+                Type.Repetition.REQUIRED).named("value"))
+            .named("data"));
+
+    RowType rowType = ParquetSchemaConverter.convertToRowType(variantParquet);
+    assertEquals(2, rowType.getFieldCount());
+    assertEquals("ROW", rowType.getTypeAt(1).getTypeRoot().name());
+  }
+
+  /**
+   * Unannotated group with metadata + value + typed_value (3 fields) is 
treated as a generic
+   * ROW when no annotation or schema hint is present.
+   */
+  @Test
+  void testUnannotatedShreddedGroupTreatedAsRow() {
+    MessageType shreddedNoAnnotation = new MessageType(
+        "test",
+        Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+            Type.Repetition.REQUIRED).named("id"),
+        Types.buildGroup(Type.Repetition.REQUIRED)
+            .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+                Type.Repetition.REQUIRED).named("metadata"))
+            .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+                Type.Repetition.REQUIRED).named("value"))
+            .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+                Type.Repetition.OPTIONAL).named("typed_value"))
+            .named("data"));
+
+    RowType rowType = 
ParquetSchemaConverter.convertToRowType(shreddedNoAnnotation);
+    assertEquals(2, rowType.getFieldCount());
+    assertEquals("ROW", rowType.getTypeAt(1).getTypeRoot().name());
+  }
+
+  /**
+   * On Flink 2.1+, converting a RowType containing a Variant column to a 
Parquet MessageType
+   * should produce a group with required binary {@code metadata} and {@code 
value} fields.
+   * On pre-2.1 Flink this test is skipped since VariantType does not exist.
+   */
+  @Test
+  void testVariantWritePathProducesCorrectLayout() {
+    LogicalType variantType;
+    try {
+      variantType = DataTypeAdapter.createVariantType().getLogicalType();
+    } catch (UnsupportedOperationException e) {
+      // Pre-2.1 Flink: VariantType doesn't exist, skip
+      return;
+    }
+
+    RowType rowType = RowType.of(
+        new LogicalType[]{new IntType(), variantType},
+        new String[]{"id", "data"});
+
+    MessageType messageType = 
ParquetSchemaConverter.convertToParquetMessageType("test", rowType);
+    assertEquals(2, messageType.getFieldCount());
+
+    Type variantField = messageType.getType("data");
+    assertTrue(variantField instanceof GroupType, "Variant column should be a 
Parquet group");
+    GroupType variantGroup = (GroupType) variantField;
+    assertEquals(2, variantGroup.getFieldCount());
+    assertEquals(HoodieSchema.Variant.VARIANT_METADATA_FIELD, 
variantGroup.getType(0).getName());
+    assertEquals(HoodieSchema.Variant.VARIANT_VALUE_FIELD, 
variantGroup.getType(1).getName());
+    assertTrue(variantGroup.getType(0).isPrimitive());
+    assertTrue(variantGroup.getType(1).isPrimitive());
+    assertEquals(PrimitiveType.PrimitiveTypeName.BINARY,
+        variantGroup.getType(0).asPrimitiveType().getPrimitiveTypeName());
+    assertEquals(PrimitiveType.PrimitiveTypeName.BINARY,
+        variantGroup.getType(1).asPrimitiveType().getPrimitiveTypeName());
+
+    // On parquet 1.15.2, annotation is null (variantType() doesn't exist).
+    // On parquet 1.16.0+, annotation should be present.
+    LogicalTypeAnnotation annotation = variantGroup.getLogicalTypeAnnotation();
+    LogicalTypeAnnotation expectedAnnotation = 
DataTypeAdapter.variantParquetAnnotation();
+    assertEquals(expectedAnnotation, annotation,
+        "Variant group annotation should match 
DataTypeAdapter.variantParquetAnnotation()");
+  }
+
+  /**
+   * Verifies that {@link DataTypeAdapter#variantParquetAnnotation()} 
gracefully returns null
+   * when parquet-java on the classpath does not have the {@code 
variantType()} factory
+   * (i.e., parquet < 1.16.0). This test will need updating when parquet is 
bumped.
+   */
+  @Test
+  void testVariantAnnotationReflectionFallback() {
+    LogicalTypeAnnotation annotation;
+    try {
+      annotation = DataTypeAdapter.variantParquetAnnotation();
+    } catch (UnsupportedOperationException e) {
+      // Pre-2.1 Flink: expected to throw, not return null
+      assertTrue(e.getMessage().contains("VARIANT type is only supported in 
Flink 2.1+"));
+      return;
+    }
+    // On Flink 2.1 with parquet 1.15.2: should be null (reflection fallback)
+    // On Flink 2.1 with parquet 1.16.0+: should be non-null
+    assertNotNull(annotation == null ? "null" : annotation,
+        "On parquet 1.15.2, variantParquetAnnotation() should return null; "

Review Comment:
   🤖 nit: the assertion `assertNotNull(annotation == null ? "null" : 
annotation, ...)` is a tautology — both branches produce a non-null value, so 
it can never fail. Could you replace it with two explicit branches, e.g. `if 
(annotation == null) { /* nothing to assert — null is valid on parquet < 1.16 
*/ } else { assertNotNull(annotation); }`, or just drop the assertion entirely 
and leave the comment?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to