This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch release-1.0.0-beta2
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-1.0.0-beta2 by this 
push:
     new dda09b4e11a Ensure properties are copied when modifying schema (#11441)
dda09b4e11a is described below

commit dda09b4e11a27fad09512447effeb38845798b56
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Jun 13 11:54:13 2024 -0400

    Ensure properties are copied when modifying schema (#11441)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../SparkFileFormatInternalRowReaderContext.scala  |  6 ++---
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 29 ++++++++++++----------
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 18 ++++----------
 .../read/HoodieFileGroupReaderSchemaHandler.java   |  3 +++
 4 files changed, 27 insertions(+), 29 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 715e2d9a9ab..92389571c13 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -23,7 +23,7 @@ import org.apache.avro.Schema
 import org.apache.avro.generic.IndexedRecord
 import org.apache.hadoop.conf.Configuration
 import 
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
 getAppliedRequiredSchema}
-import org.apache.hudi.avro.AvroSchemaUtils
+import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
@@ -159,14 +159,14 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
       rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
       //always remove the row index column from the skeleton because the data 
file will also have the same column
       val skeletonProjection = projectRecord(skeletonRequiredSchema,
-        AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema, 
rowIndexColumn))
+        HoodieAvroUtils.removeFields(skeletonRequiredSchema, rowIndexColumn))
 
       //If we need to do position based merging with log files we will leave 
the row index column at the end
       val dataProjection = if (getHasLogFiles && 
getShouldMergeUseRecordPosition) {
         getIdentityProjection
       } else {
         projectRecord(dataRequiredSchema,
-          AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema, 
rowIndexColumn))
+          HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn))
       }
 
       //row index will always be the last column
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index da3ab4824b8..642624e0ed3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiFunction;
@@ -236,7 +237,7 @@ public class AvroSchemaUtils {
       isUnion = true;
       foundSchema = resolveNullableSchema(foundSchema);
     }
-    Schema newSchema = Schema.createRecord(foundSchema.getName(), 
foundSchema.getDoc(), foundSchema.getNamespace(), false, 
Collections.singletonList(nestedPart.get()));
+    Schema newSchema = createNewSchemaFromFieldsWithReference(foundSchema, 
Collections.singletonList(nestedPart.get()));
     return Option.of(new Schema.Field(foundField.name(), isUnion ? 
createNullableSchema(newSchema) : newSchema, foundField.doc(), 
foundField.defaultVal()));
   }
 
@@ -259,10 +260,7 @@ public class AvroSchemaUtils {
         fields.add(new Schema.Field(f.name(), f.schema(), f.doc(), 
f.defaultVal()));
       }
     }
-
-    Schema newSchema = Schema.createRecord(a.getName(), a.getDoc(), 
a.getNamespace(), a.isError());
-    newSchema.setFields(fields);
-    return newSchema;
+    return createNewSchemaFromFieldsWithReference(a, fields);
   }
 
   /**
@@ -292,17 +290,22 @@ public class AvroSchemaUtils {
       fields.addAll(newFields);
     }
 
-    Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), 
schema.getNamespace(), schema.isError());
-    newSchema.setFields(fields);
-    return newSchema;
+    return createNewSchemaFromFieldsWithReference(schema, fields);
   }
 
-  public static Schema removeFieldsFromSchema(Schema schema, Set<String> 
fieldNames) {
-    List<Schema.Field> fields = schema.getFields().stream()
-        .filter(field -> !fieldNames.contains(field.name()))
-        .map(field -> new Schema.Field(field.name(), field.schema(), 
field.doc(), field.defaultVal()))
-        .collect(Collectors.toList());
+  /**
+   * Create a new schema but maintain all meta info from the old schema
+   *
+   * @param schema schema to get the meta info from
+   * @param fields list of fields in order that will be in the new schema
+   *
+   * @return schema with fields from fields, and metadata from schema
+   */
+  public static Schema createNewSchemaFromFieldsWithReference(Schema schema, 
List<Schema.Field> fields) {
     Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), 
schema.getNamespace(), schema.isError());
+    for (Map.Entry<String, Object> prop : schema.getObjectProps().entrySet()) {
+      newSchema.addProp(prop.getKey(), prop.getValue());
+    }
     newSchema.setFields(fields);
     return newSchema;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index be149a046bc..fb936468466 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -99,6 +99,7 @@ import java.util.stream.Collectors;
 import static org.apache.avro.Schema.Type.ARRAY;
 import static org.apache.avro.Schema.Type.MAP;
 import static org.apache.avro.Schema.Type.UNION;
+import static 
org.apache.hudi.avro.AvroSchemaUtils.createNewSchemaFromFieldsWithReference;
 import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
 import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
 import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
@@ -338,13 +339,7 @@ public class HoodieAvroUtils {
         parentFields.add(newField);
       }
     }
-
-    Schema mergedSchema = Schema.createRecord(schema.getName(), 
schema.getDoc(), schema.getNamespace(), false);
-    for (Map.Entry<String, Object> prop : schema.getObjectProps().entrySet()) {
-      mergedSchema.addProp(prop.getKey(), prop.getValue());
-    }
-    mergedSchema.setFields(parentFields);
-    return mergedSchema;
+    return createNewSchemaFromFieldsWithReference(schema, parentFields);
   }
 
   public static boolean isSchemaNull(Schema schema) {
@@ -364,9 +359,8 @@ public class HoodieAvroUtils {
         .filter(field -> !fieldsToRemove.contains(field.name()))
         .map(field -> new Schema.Field(field.name(), field.schema(), 
field.doc(), field.defaultVal()))
         .collect(Collectors.toList());
-    Schema filteredSchema = Schema.createRecord(schema.getName(), 
schema.getDoc(), schema.getNamespace(), false);
-    filteredSchema.setFields(filteredFields);
-    return filteredSchema;
+
+    return createNewSchemaFromFieldsWithReference(schema, filteredFields);
   }
 
   public static String addMetadataColumnTypes(String hiveColumnTypes) {
@@ -385,9 +379,7 @@ public class HoodieAvroUtils {
           }
         })
         .collect(Collectors.toList());
-    Schema withNonNullField = Schema.createRecord(schema.getName(), 
schema.getDoc(), schema.getNamespace(), false);
-    withNonNullField.setFields(filteredFields);
-    return withNonNullField;
+    return createNewSchemaFromFieldsWithReference(schema, filteredFields);
   }
 
   private static Schema initRecordKeySchema() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
index 1f29fc8894d..85d0cef2893 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
@@ -179,6 +179,9 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
       fields.set(i, new Schema.Field(curr.name(), curr.schema(), curr.doc(), 
curr.defaultVal()));
     }
     Schema newSchema = Schema.createRecord(dataSchema.getName(), 
dataSchema.getDoc(), dataSchema.getNamespace(), dataSchema.isError());
+    for (Map.Entry<String, Object> prop : 
dataSchema.getObjectProps().entrySet()) {
+      newSchema.addProp(prop.getKey(), prop.getValue());
+    }
     newSchema.setFields(fields);
     return newSchema;
   }

Reply via email to