cshuo commented on code in PR #18723:
URL: https://github.com/apache/hudi/pull/18723#discussion_r3309212123
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java:
##########
@@ -143,19 +173,34 @@ public static AvroToRowDataConverter
createConverter(LogicalType type, boolean u
return createTimestampConverter(((TimestampType) type).getPrecision(),
utcTimezone);
case CHAR:
case VARCHAR:
- return avroObject -> avroObject instanceof Utf8 ?
StringData.fromBytes(((Utf8) avroObject).getBytes()) :
StringData.fromString(avroObject.toString());
+ return avroObject -> avroObject instanceof Utf8
+ ? StringData.fromBytes(((Utf8) avroObject).getBytes())
+ : StringData.fromString(avroObject.toString());
case BINARY:
case VARBINARY:
return AvroToRowDataConverters::convertToBytes;
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ARRAY:
- return createArrayConverter((ArrayType) type, utcTimezone);
+ if (nonNullSchema.getType() == HoodieSchemaType.VECTOR) {
+ HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector)
nonNullSchema;
+ VectorConversionUtils.validateVectorLogicalType(vectorSchema, type);
+ return createVectorConverter(vectorSchema);
+ }
+ return createArrayConverter(nonNullSchema.getElementType(),
(ArrayType) type, utcTimezone);
case ROW:
- return createRowConverter((RowType) type, utcTimezone);
+ if (nonNullSchema.getType() == HoodieSchemaType.UNION) {
+ // getNonNullType() unwraps simple nullable unions only. Complex
unions can still reach
+ // here when their Flink representation is a ROW, for example fields
inside
+ // ColumnStatsSchemas.METADATA_SCHEMA. In that case the RowType
already captures the
+ // target Flink shape, so use the first union branch only as the
positional Hoodie schema
+ // template for building nested field converters.
+ nonNullSchema = nonNullSchema.getTypes().get(0);
Review Comment:
Yes, that makes sense. Since createRowConverter(HoodieSchema, RowType,
boolean) is the boundary where we build a positional ROW converter, I’ll
normalize the remaining complex union there before deriving field converters.
getNonNullType() already handles nullable unions, and for the complex-union ROW
case we can use the first non-null branch as the record-shaped template.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java:
##########
@@ -78,16 +81,27 @@ public interface AvroToRowDataConverter extends
Serializable {
//
-------------------------------------------------------------------------------------
// Runtime Converters
//
-------------------------------------------------------------------------------------
- public static AvroToRowDataConverter createRowConverter(RowType rowType) {
Review Comment:
Good point. The in-repo usages have been migrated to the HoodieSchema-aware
overload so VECTOR metadata is preserved where needed, but keeping the
RowType-only overload is cheap and avoids breaking existing callers. I'll
restore it as a compatibility/convenience path, with the caveat that it cannot
recover Hoodie-specific logical metadata from RowType alone.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java:
##########
@@ -78,16 +81,27 @@ public interface AvroToRowDataConverter extends
Serializable {
//
-------------------------------------------------------------------------------------
// Runtime Converters
//
-------------------------------------------------------------------------------------
- public static AvroToRowDataConverter createRowConverter(RowType rowType) {
- return createRowConverter(rowType, true);
+ /**
+ * Creates a row converter using only the Flink row type.
+ *
+ * <p>This converter cannot recover Hoodie-specific logical type metadata
from {@link RowType}.
+ * Use {@link #createRowConverter(HoodieSchema, RowType, boolean)} when a
Hoodie schema is
+ * available, especially for VECTOR columns.
+ */
+ public static AvroToRowDataConverter createRowConverter(HoodieSchema
hoodieSchema) {
+ return createRowConverter(hoodieSchema, (RowType)
HoodieSchemaConverter.convertToDataType(hoodieSchema).getLogicalType(), true);
}
- public static AvroToRowDataConverter createRowConverter(RowType rowType,
boolean utcTimezone) {
- final AvroToRowDataConverter[] fieldConverters =
- rowType.getFields().stream()
- .map(RowType.RowField::getType)
- .map(type -> AvroToRowDataConverters.createNullableConverter(type,
utcTimezone))
- .toArray(AvroToRowDataConverter[]::new);
+ /**
+ * Creates a row converter using both Hoodie schema metadata and the target
Flink row type.
+ */
+ public static AvroToRowDataConverter createRowConverter(HoodieSchema schema,
RowType rowType, boolean utcTimezone) {
+ HoodieSchema recordSchema = schema.getNonNullType();
+ final List<HoodieSchemaField> fields = recordSchema.getFields();
+ final AvroToRowDataConverter[] fieldConverters = new
AvroToRowDataConverter[rowType.getFieldCount()];
Review Comment:
Updated.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java:
##########
@@ -120,6 +124,49 @@ public static int[] projectOrdinals(RowType rowType,
RowType producedRowType) {
return
producedRowType.getFieldNames().stream().mapToInt(fieldNames::indexOf).toArray();
}
+ /**
+ * Creates the hoodie required schema for a projected Flink row type.
+ *
+ * <p>When a requested field is a hoodie specific logical type in {@code
tableSchema}, this method
+ * reuses the table schema field to preserve logical metadata that cannot be
recovered from Flink
+ * {@link RowType}, for example VARIANT semantics or VECTOR element type and
dimension. Other
+ * fields are taken from the schema converted from {@code requiredRowType},
so readers use the
+ * projected field schema and can still keep missing required columns in the
requested schema for
+ * later schema-evolution/default-value handling.
+ *
+ * @param tableSchema source table schema with hoodie logical type
metadata
+ * @param requiredRowType projected Flink row type requested by the query
+ * @return required hoodie schema matching the projected field order
+ */
+ public static HoodieSchema createRequiredSchema(HoodieSchema tableSchema,
RowType requiredRowType) {
+ HoodieSchema fallbackRequiredSchema =
HoodieSchemaConverter.convertToSchema(requiredRowType);
Review Comment:
I avoided `generateProjectionSchema` here because it would copy every
projected field directly from `tableSchema`. For ordinary fields we still want
the schema derived from `requiredRowType`, so the reader keeps the projected
query shape and schema-evolution/default-value handling. We only reuse
`tableSchema` fields for Hoodie logical types whose metadata cannot be
reconstructed from RowType, currently VECTOR and VARIANT.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -535,12 +535,12 @@ private Option<Function<Integer, Integer>>
getDataBucketFunc(List<ResolvedExpres
return mergeOnReadInputFormat(rowType, requiredRowType,
tableSchema,
rowDataType, inputSplits, false);
case COPY_ON_WRITE:
- return baseFileOnlyInputFormat();
+ return baseFileOnlyInputFormat(tableSchema);
Review Comment:
Yes, that is expected here. In the COW path the tableSchema is only used to
discover Hoodie logical VECTOR metadata by field name; metadata fields are
plain string fields and are ignored by VectorConversionUtils#getVectorFields.
The actual read projection still comes from fullFieldNames/selectedFields, so
including Hoodie metadata fields in tableSchema does not change the projected
row layout.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]