Jonathan Vexler created HUDI-9670:
-------------------------------------
Summary: Fix schema.on.write support for flink reader
Key: HUDI-9670
URL: https://issues.apache.org/jira/browse/HUDI-9670
Project: Apache Hudi
Issue Type: Bug
Components: flink
Reporter: Jonathan Vexler
org.apache.hudi.table.TestHoodieFileGroupReaderOnFlink#getSchemaEvolutionConfigs
should have most/all cases enabled.
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader#ParquetColumnarRowSplitReader
reads the parquet footer. One solution to adding this support would be to pull
this up a few layers, use the util method pruneDataSchema from:
[https://github.com/apache/hudi/pull/13654] and then cast/project from the
pruned schema to the requested schema.
The test datagen will need to be updated because there is a flink bug
HUDI-9603
Here are some fixes I already did:
{code:java}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
index a0741ea6705..fb46996317e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
@@ -69,7 +69,7 @@ public class SchemaEvolvingRowDataProjection implements
RowProjection {
case ROW:
return createRowProjection(fromType, toType, renamedColumns,
fieldNameStack);
default:
- if (fromType.equals(toType)) {
+ if (fromType.equals(toType) ||
fromType.getTypeRoot().equals(toType.getTypeRoot())) {
return TypeConverters.NOOP_CONVERTER;
} else {
// return TypeConverter directly for non-composite type
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
index cef2df7c037..83b886307ab 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
@@ -53,7 +53,10 @@ import static
org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
+import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
/**
* Tool class used to perform supported casts from a {@link LogicalType} to
another
@@ -81,6 +84,20 @@ public class TypeConverters {
LogicalTypeRoot to = toType.getTypeRoot();
switch (to) {
+ case VARBINARY: {
+ if (from == VARCHAR) {
+ return new TypeConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object val) {
+ return getUTF8Bytes(val.toString());
+ }
+ };
+ }
+ break;
+ }
+
case BIGINT: {
if (from == INTEGER) {
return new TypeConverter() {
@@ -202,6 +219,16 @@ public class TypeConverters {
}
};
}
+ if (from == VARBINARY) {
+ return new TypeConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object val) {
+ return new BinaryStringData(fromUTF8Bytes((byte[]) val));
+ }
+ };
+ }
break;
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index bb671a8f5ef..f165161b317 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -1336,6 +1336,9 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
// Bytes
public boolean bytesToStringSupport = true;
+
+ // TODO: [HUDI-9607] Flink VARBINARY in array and map
+ public boolean supportBytesInArrayMap = true;
}
private enum SchemaEvolutionTypePromotionCase {
@@ -1430,13 +1433,13 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
if (toplevel) {
if (configs.mapSupport) {
List<Schema.Field> mapFields = new ArrayList<>(baseFields.size());
- addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map");
+ addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map",
!configs.supportBytesInArrayMap);
finalFields.add(new Schema.Field(fieldPrefix + "Map",
Schema.createMap(Schema.createRecord("customMapRecord", "", namespace, false,
mapFields)), "", null));
}
- if (configs.arraySupport) {
+ if (configs.arraySupport && configs.anyArraySupport) {
List<Schema.Field> arrayFields = new ArrayList<>(baseFields.size());
- addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array");
+ addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array",
!configs.supportBytesInArrayMap);
finalFields.add(new Schema.Field(fieldPrefix + "Array",
Schema.createArray(Schema.createRecord("customArrayRecord", "", namespace,
false, arrayFields)), "", null));
}
}
@@ -1444,12 +1447,21 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
}
private static void addFieldsHelper(List<Schema.Field> finalFields,
List<Schema.Type> baseFields, String fieldPrefix) {
+ addFieldsHelper(finalFields, baseFields, fieldPrefix, false);
+ }
+
+ // TODO: [HUDI-9603] remove replaceBytesWithStrings when the issue is fixed
+ private static void addFieldsHelper(List<Schema.Field> finalFields,
List<Schema.Type> baseFields, String fieldPrefix, boolean
replaceBytesWithStrings) {
for (int i = 0; i < baseFields.size(); i++) {
if (baseFields.get(i) == Schema.Type.BOOLEAN) {
// boolean fields are added fields
finalFields.add(new Schema.Field(fieldPrefix + i,
AvroSchemaUtils.createNullableSchema(Schema.Type.BOOLEAN), "", null));
} else {
- finalFields.add(new Schema.Field(fieldPrefix + i,
Schema.create(baseFields.get(i)), "", null));
+ Schema.Type type = baseFields.get(i);
+ if (replaceBytesWithStrings && type == Schema.Type.BYTES) {
+ type = Schema.Type.STRING;
+ }
+ finalFields.add(new Schema.Field(fieldPrefix + i, Schema.create(type),
"", null));
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index dc66a0e6a74..f659e06ad50 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -177,23 +177,9 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
@Override
public HoodieTestDataGenerator.SchemaEvolutionConfigs
getSchemaEvolutionConfigs() {
HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new
HoodieTestDataGenerator.SchemaEvolutionConfigs();
- configs.nestedSupport = false;
+ configs.supportBytesInArrayMap = false;
configs.arraySupport = false;
- configs.mapSupport = false;
configs.anyArraySupport = false;
- configs.addNewFieldSupport = false;
- configs.intToLongSupport = false;
- configs.intToFloatSupport = false;
- configs.intToDoubleSupport = false;
- configs.intToStringSupport = false;
- configs.longToFloatSupport = false;
- configs.longToDoubleSupport = false;
- configs.longToStringSupport = false;
- configs.floatToDoubleSupport = false;
- configs.floatToStringSupport = false;
- configs.doubleToStringSupport = false;
- configs.stringToBytesSupport = false;
- configs.bytesToStringSupport = false;
return configs;
}{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)