voonhous commented on code in PR #18065:
URL: https://github.com/apache/hudi/pull/18065#discussion_r3428436244
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -74,6 +325,179 @@ public void addFooterMetadata(String key, String value) {
footerMetadata.put(key, value);
}
+ /**
+ * Bundles the Avro sub-schema and {@link HoodieSchema.Variant} for a
shredded variant field,
+ * keyed by effective-schema field index in {@link #shreddedVariantFields}.
+ */
+ private static final class ShreddedVariantField {
+ private final Schema avroSchema;
+ private final HoodieSchema.Variant hoodieSchema;
+
+ ShreddedVariantField(Schema avroSchema, HoodieSchema.Variant hoodieSchema)
{
+ this.avroSchema = avroSchema;
+ this.hoodieSchema = hoodieSchema;
+ }
+ }
+
+ private static final Pattern DECIMAL_PATTERN = Pattern.compile(
+ "decimal\\s*\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)");
+
+ /**
+ * Applies a forced shredding schema to all variant fields in the given
schema.
+ * The forced schema DDL (e.g., {@code "a int, b string"}) defines the
typed_value
+ * fields that will be added to each variant column.
+ */
+ private static HoodieSchema applyForcedShreddingSchema(HoodieSchema schema,
String ddl) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ return schema;
+ }
+
+ Map<String, HoodieSchema> shreddedFields = parseShreddingDDL(ddl);
+
+ List<HoodieSchemaField> fields = schema.getFields();
+ List<HoodieSchemaField> newFields = new ArrayList<>();
+ boolean changed = false;
+
+ for (HoodieSchemaField field : fields) {
+ HoodieSchema fieldSchema = field.schema();
+ boolean wasNullable = fieldSchema.isNullable();
+ HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() :
fieldSchema;
+
+ if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant shreddedVariant =
HoodieSchema.createVariantShreddedObject(
+ unwrapped.getAvroSchema().getName(),
+ unwrapped.getAvroSchema().getNamespace(),
+ unwrapped.getAvroSchema().getDoc(),
+ shreddedFields);
+ HoodieSchema replacement = wasNullable
+ ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant;
+
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement)));
+ changed = true;
+ } else {
+ newFields.add(HoodieSchemaUtils.createNewSchemaField(field));
+ }
+ }
+
+ if (!changed) {
+ return schema;
+ }
+
+ return HoodieSchema.createRecord(
+ schema.getAvroSchema().getName(),
+ schema.getAvroSchema().getNamespace(),
+ schema.getAvroSchema().getDoc(),
+ newFields);
+ }
+
+ /**
+ * Parses a DDL-style shredding schema string (e.g., {@code "a int, b
string, c decimal(15,1)"})
+ * into a map of field names to their HoodieSchema types.
+ */
+ private static Map<String, HoodieSchema> parseShreddingDDL(String ddl) {
+ Map<String, HoodieSchema> fields = new LinkedHashMap<>();
+ // Split on top-level commas only so parameterized types such as
decimal(15, 1) survive intact.
+ for (String fieldDef : StringUtils.splitTopLevelCommas(ddl)) {
+ String[] parts = fieldDef.split("\\s+", 2);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid shredding DDL field definition (expected 'name type'): "
+ fieldDef);
+ }
+ fields.put(parts[0].trim(), parseSimpleType(parts[1].trim()));
+ }
+ return fields;
+ }
+
+ /**
+ * Parses a simple type name into a HoodieSchema.
+ * Supports common types: int, long, string, double, float, boolean, binary,
decimal(p,s).
+ */
+ private static HoodieSchema parseSimpleType(String type) {
+ String lower = type.toLowerCase();
+ switch (lower) {
+ case "int":
+ case "integer":
+ return HoodieSchema.create(HoodieSchemaType.INT);
+ case "long":
+ case "bigint":
+ return HoodieSchema.create(HoodieSchemaType.LONG);
+ case "string":
+ return HoodieSchema.create(HoodieSchemaType.STRING);
+ case "double":
+ return HoodieSchema.create(HoodieSchemaType.DOUBLE);
+ case "float":
+ return HoodieSchema.create(HoodieSchemaType.FLOAT);
+ case "boolean":
+ return HoodieSchema.create(HoodieSchemaType.BOOLEAN);
+ case "binary":
+ return HoodieSchema.create(HoodieSchemaType.BYTES);
+ default:
+ Matcher m = DECIMAL_PATTERN.matcher(lower);
+ if (m.matches()) {
+ return HoodieSchema.createDecimal(
+ Integer.parseInt(m.group(1)), Integer.parseInt(m.group(2)));
+ }
+ throw new IllegalArgumentException("Unsupported shredding type: " +
type);
+ }
+ }
+
+ /**
+ * Strips shredding from variant fields in the schema.
+ * Replaces shredded variant fields with unshredded variants (removing
typed_value).
+ */
+ private static HoodieSchema stripVariantShredding(HoodieSchema schema) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ return schema;
+ }
+
+ List<HoodieSchemaField> fields = schema.getFields();
+ List<HoodieSchemaField> newFields = new ArrayList<>();
+ boolean changed = false;
+
+ for (HoodieSchemaField field : fields) {
+ HoodieSchema fieldSchema = field.schema();
+ boolean wasNullable = fieldSchema.isNullable();
+ HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() :
fieldSchema;
+
+ if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant variant = (HoodieSchema.Variant) unwrapped;
+ if (variant.isShredded()) {
+ // Replace with unshredded variant
+ HoodieSchema.Variant unshredded = HoodieSchema.createVariant(
+ unwrapped.getAvroSchema().getName(),
+ unwrapped.getAvroSchema().getNamespace(),
+ unwrapped.getAvroSchema().getDoc());
+ HoodieSchema replacement = wasNullable ?
HoodieSchema.createNullable(unshredded) : unshredded;
+ newFields.add(field.withSchema(replacement));
+ changed = true;
+ continue;
+ }
+ }
+ newFields.add(field);
+ }
+
+ if (!changed) {
+ return schema;
+ }
Review Comment:
Good eye on the shared scaffolding, but I'd prefer to keep these two
separate. They look alike but differ in three ways that resist a clean single
helper:
1. Scope: applyForcedShreddingSchema rewrites every variant;
stripVariantShredding only touches variants where isShredded() is true.
2. Replacement: a shredded-object built from the forced DDL vs. a plain
unshredded variant.
3. Field rebuild: the forced path does field.makeNullable().withSchema(...)
and clones every field via createNewSchemaField; the strip path uses
field.withSchema(...) for changed fields and reuses the raw field object for
unchanged ones.
A Function<Variant, HoodieSchema> helper handles 1 and 2, but 3 can't be
merged without forcing one rebuild mechanic onto both. The forced path is
test-only (force.shredding.schema.for.test); the strip path is production
(compaction/clustering when shredding is disabled), so normalizing its field
handling is a behavioral change I'd rather not make under a nit. Net, a
parameterized helper would either change behavior or need enough flags that it
stops being clearer than the two explicit methods.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]