balaji-varadarajan-ai commented on code in PR #17833:
URL: https://github.com/apache/hudi/pull/17833#discussion_r2957120068
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java:
##########
@@ -550,6 +552,26 @@ private static DataType convertUnion(HoodieSchema schema) {
return nullable ? rawDataType.nullable() : rawDataType;
}
+ /**
+ * Converts a Variant schema to Flink's ROW type.
+ * Variant is represented as ROW<`value` BYTES, `metadata` BYTES> in Flink.
+ * // TODO: We are only supporting unshredded for now, support shredded in
the future
+ *
+ * @param schema HoodieSchema to convert (must be a VARIANT type)
+ * @return DataType representing the Variant as a ROW with binary fields
+ */
+ private static DataType convertVariant(HoodieSchema schema) {
+ if (schema.getType() != HoodieSchemaType.VARIANT) {
+ throw new IllegalStateException("Expected HoodieSchema.Variant but got:
" + schema.getClass());
+ }
+
+ // Variant is stored as a struct with two binary fields: value and metadata
+ return DataTypes.ROW(
Review Comment:
thanks
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java:
##########
@@ -441,6 +455,24 @@ private static HoodieSchema
visitInternalSchemaToBuildHoodieSchema(Type type, Ma
*/
private static HoodieSchema
visitInternalRecordToBuildHoodieRecord(Types.RecordType recordType,
List<HoodieSchema> fieldSchemas, String recordNameFallback) {
List<Types.Field> fields = recordType.fields();
+
+ // Detect Variant round-trip: sentinel negative IDs with value/metadata
fields
+ if (fields.size() == 2) {
+ Types.Field field0 = fields.get(0);
+ Types.Field field1 = fields.get(1);
+ boolean hasNegativeIds = field0.fieldId() < 0 && field1.fieldId() < 0;
+ boolean hasVariantFields =
(field0.name().equals(HoodieSchema.Variant.VARIANT_VALUE_FIELD)
+ &&
field1.name().equals(HoodieSchema.Variant.VARIANT_METADATA_FIELD))
+ || (field0.name().equals(HoodieSchema.Variant.VARIANT_METADATA_FIELD)
+ &&
field1.name().equals(HoodieSchema.Variant.VARIANT_VALUE_FIELD));
+
+ if (hasNegativeIds && hasVariantFields) {
+ // TODO: Flesh out schema evolution for Variant types #18285
Review Comment:
@voonhous : I think it is better to check with how Spark deals with internal
byte fields manipulation on top of direct parquet dataset. Can you add some
documentation on this. that would be helpful to cross check. Getting to parity
in the behavior would be good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]