voonhous commented on code in PR #18065:
URL: https://github.com/apache/hudi/pull/18065#discussion_r3371707191
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -74,6 +292,181 @@ public void addFooterMetadata(String key, String value) {
footerMetadata.put(key, value);
}
+ /**
+ * Finds the position in {@link #shreddedVariantFieldIndices} for the given
effective field index,
+ * or -1 if this field is not a variant field that needs shredding.
+ */
+ private int findVariantIndex(int effectiveFieldIndex) {
+ for (int i = 0; i < shreddedVariantFieldIndices.length; i++) {
+ if (shreddedVariantFieldIndices[i] == effectiveFieldIndex) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private static final Pattern DECIMAL_PATTERN = Pattern.compile(
+ "decimal\\s*\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)");
+
+ /**
+ * Applies a forced shredding schema to all variant fields in the given
schema.
+ * The forced schema DDL (e.g., {@code "a int, b string"}) defines the
typed_value
+ * fields that will be added to each variant column.
+ */
+ private static HoodieSchema applyForcedShreddingSchema(HoodieSchema schema,
String ddl) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ return schema;
+ }
+
+ Map<String, HoodieSchema> shreddedFields = parseShreddingDDL(ddl);
+
+ List<HoodieSchemaField> fields = schema.getFields();
+ List<HoodieSchemaField> newFields = new ArrayList<>();
+ boolean changed = false;
+
+ for (HoodieSchemaField field : fields) {
+ HoodieSchema fieldSchema = field.schema();
+ boolean wasNullable = fieldSchema.isNullable();
+ HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() :
fieldSchema;
+
+ if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant shreddedVariant =
HoodieSchema.createVariantShreddedObject(
+ unwrapped.getAvroSchema().getName(),
+ unwrapped.getAvroSchema().getNamespace(),
+ unwrapped.getAvroSchema().getDoc(),
+ shreddedFields);
+ HoodieSchema replacement = wasNullable
+ ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant;
+
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement)));
+ changed = true;
+ } else {
+ newFields.add(HoodieSchemaUtils.createNewSchemaField(field));
+ }
+ }
+
+ if (!changed) {
+ return schema;
+ }
+
+ return HoodieSchema.createRecord(
+ schema.getAvroSchema().getName(),
+ schema.getAvroSchema().getNamespace(),
+ schema.getAvroSchema().getDoc(),
+ newFields);
+ }
+
+ /**
+ * Parses a DDL-style shredding schema string (e.g., {@code "a int, b
string, c decimal(15,1)"})
+ * into a map of field names to their HoodieSchema types.
+ */
+ private static Map<String, HoodieSchema> parseShreddingDDL(String ddl) {
+ Map<String, HoodieSchema> fields = new LinkedHashMap<>();
+ for (String fieldDef : ddl.split(",")) {
+ String trimmed = fieldDef.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+ String[] parts = trimmed.split("\\s+", 2);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid shredding DDL field definition (expected 'name type'): "
+ trimmed);
+ }
+ fields.put(parts[0].trim(), parseSimpleType(parts[1].trim()));
+ }
+ return fields;
+ }
+
+ /**
+ * Parses a simple type name into a HoodieSchema.
+ * Supports common types: int, long, string, double, float, boolean, binary,
decimal(p,s).
+ */
+ private static HoodieSchema parseSimpleType(String type) {
+ String lower = type.toLowerCase();
+ switch (lower) {
+ case "int":
+ case "integer":
+ return HoodieSchema.create(HoodieSchemaType.INT);
+ case "long":
+ case "bigint":
+ return HoodieSchema.create(HoodieSchemaType.LONG);
+ case "string":
+ return HoodieSchema.create(HoodieSchemaType.STRING);
+ case "double":
+ return HoodieSchema.create(HoodieSchemaType.DOUBLE);
+ case "float":
+ return HoodieSchema.create(HoodieSchemaType.FLOAT);
+ case "boolean":
+ return HoodieSchema.create(HoodieSchemaType.BOOLEAN);
+ case "binary":
+ return HoodieSchema.create(HoodieSchemaType.BYTES);
+ default:
+ Matcher m = DECIMAL_PATTERN.matcher(lower);
+ if (m.matches()) {
+ return HoodieSchema.createDecimal(
+ Integer.parseInt(m.group(1)), Integer.parseInt(m.group(2)));
+ }
+ throw new IllegalArgumentException("Unsupported shredding type: " +
type);
+ }
+ }
+
+ /**
+ * Strips shredding from variant fields in the schema.
+ * Replaces shredded variant fields with unshredded variants (removing
typed_value).
+ */
+ private static HoodieSchema unshreddVariantFields(HoodieSchema schema) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
Review Comment:
Done, renamed to `stripVariantShredding`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]