Jonathan Vexler created HUDI-9670:
-------------------------------------

             Summary: Fix schema.on.write support for flink reader
                 Key: HUDI-9670
                 URL: https://issues.apache.org/jira/browse/HUDI-9670
             Project: Apache Hudi
          Issue Type: Bug
          Components: flink
            Reporter: Jonathan Vexler


org.apache.hudi.table.TestHoodieFileGroupReaderOnFlink#getSchemaEvolutionConfigs
should have most/all cases enabled. 

 

org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader#ParquetColumnarRowSplitReader
 reads the parquet footer. One solution to adding this support would be to pull 
this up a few layers, use the util method pruneDataSchema from: 
[https://github.com/apache/hudi/pull/13654]  and then cast/project from the 
pruned schema to the requested schema.

 

 The test datagen will need to be updated because there is a flink bug 
HUDI-9603 

Here are some fixes I already did:
{code:java}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
index a0741ea6705..fb46996317e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
@@ -69,7 +69,7 @@ public class SchemaEvolvingRowDataProjection implements 
RowProjection {
       case ROW:
         return createRowProjection(fromType, toType, renamedColumns, 
fieldNameStack);
       default:
-        if (fromType.equals(toType)) {
+        if (fromType.equals(toType) || 
fromType.getTypeRoot().equals(toType.getTypeRoot())) {
           return TypeConverters.NOOP_CONVERTER;
         } else {
           // return TypeConverter directly for non-composite type
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
index cef2df7c037..83b886307ab 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
@@ -53,7 +53,10 @@ import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
+import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 
 /**
  * Tool class used to perform supported casts from a {@link LogicalType} to 
another
@@ -81,6 +84,20 @@ public class TypeConverters {
     LogicalTypeRoot to = toType.getTypeRoot();
 
     switch (to) {
+      case VARBINARY: {
+        if (from == VARCHAR) {
+          return new TypeConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object val) {
+              return getUTF8Bytes(val.toString());
+            }
+          };
+        }
+        break;
+      }
+
       case BIGINT: {
         if (from == INTEGER) {
           return new TypeConverter() {
@@ -202,6 +219,16 @@ public class TypeConverters {
             }
           };
         }
+        if (from == VARBINARY) {
+          return new TypeConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object val) {
+              return new BinaryStringData(fromUTF8Bytes((byte[]) val));
+            }
+          };
+        }
         break;
       }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index bb671a8f5ef..f165161b317 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -1336,6 +1336,9 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
 
     // Bytes
     public boolean bytesToStringSupport = true;
+
+    // TODO: [HUDI-9607] Flink VARBINARY in array and map
+    public boolean supportBytesInArrayMap = true;
   }
 
   private enum SchemaEvolutionTypePromotionCase {
@@ -1430,13 +1433,13 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
     if (toplevel) {
       if (configs.mapSupport) {
         List<Schema.Field> mapFields = new ArrayList<>(baseFields.size());
-        addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map");
+        addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map", 
!configs.supportBytesInArrayMap);
         finalFields.add(new Schema.Field(fieldPrefix + "Map", 
Schema.createMap(Schema.createRecord("customMapRecord", "", namespace, false, 
mapFields)), "", null));
       }
 
-      if (configs.arraySupport) {
+      if (configs.arraySupport && configs.anyArraySupport) {
         List<Schema.Field> arrayFields = new ArrayList<>(baseFields.size());
-        addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array");
+        addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array", 
!configs.supportBytesInArrayMap);
         finalFields.add(new Schema.Field(fieldPrefix + "Array", 
Schema.createArray(Schema.createRecord("customArrayRecord", "", namespace, 
false, arrayFields)), "", null));
       }
     }
@@ -1444,12 +1447,21 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
   }
 
   private static void addFieldsHelper(List<Schema.Field> finalFields, 
List<Schema.Type> baseFields, String fieldPrefix) {
+    addFieldsHelper(finalFields, baseFields, fieldPrefix, false);
+  }
+
+  // TODO: [HUDI-9603] remove replaceBytesWithStrings when the issue is fixed
+  private static void addFieldsHelper(List<Schema.Field> finalFields, 
List<Schema.Type> baseFields, String fieldPrefix, boolean 
replaceBytesWithStrings) {
     for (int i = 0; i < baseFields.size(); i++) {
       if (baseFields.get(i) == Schema.Type.BOOLEAN) {
         // boolean fields are added fields
         finalFields.add(new Schema.Field(fieldPrefix + i, 
AvroSchemaUtils.createNullableSchema(Schema.Type.BOOLEAN), "", null));
       } else {
-        finalFields.add(new Schema.Field(fieldPrefix + i, 
Schema.create(baseFields.get(i)), "", null));
+        Schema.Type type = baseFields.get(i);
+        if (replaceBytesWithStrings && type == Schema.Type.BYTES) {
+          type = Schema.Type.STRING;
+        }
+        finalFields.add(new Schema.Field(fieldPrefix + i, Schema.create(type), 
"", null));
       }
     }
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index dc66a0e6a74..f659e06ad50 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -177,23 +177,9 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
   @Override
   public HoodieTestDataGenerator.SchemaEvolutionConfigs 
getSchemaEvolutionConfigs() {
     HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new 
HoodieTestDataGenerator.SchemaEvolutionConfigs();
-    configs.nestedSupport = false;
+    configs.supportBytesInArrayMap = false;
     configs.arraySupport = false;
-    configs.mapSupport = false;
     configs.anyArraySupport = false;
-    configs.addNewFieldSupport = false;
-    configs.intToLongSupport = false;
-    configs.intToFloatSupport = false;
-    configs.intToDoubleSupport = false;
-    configs.intToStringSupport = false;
-    configs.longToFloatSupport = false;
-    configs.longToDoubleSupport = false;
-    configs.longToStringSupport = false;
-    configs.floatToDoubleSupport = false;
-    configs.floatToStringSupport = false;
-    configs.doubleToStringSupport = false;
-    configs.stringToBytesSupport = false;
-    configs.bytesToStringSupport = false;
     return configs;
   }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to