This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch release-1.0.0-beta2
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-1.0.0-beta2 by this
push:
new dda09b4e11a Ensure properties are copied when modifying schema (#11441)
dda09b4e11a is described below
commit dda09b4e11a27fad09512447effeb38845798b56
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Jun 13 11:54:13 2024 -0400
Ensure properties are copied when modifying schema (#11441)
Co-authored-by: Jonathan Vexler <=>
---
.../SparkFileFormatInternalRowReaderContext.scala | 6 ++---
.../java/org/apache/hudi/avro/AvroSchemaUtils.java | 29 ++++++++++++----------
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 18 ++++----------
.../read/HoodieFileGroupReaderSchemaHandler.java | 3 +++
4 files changed, 27 insertions(+), 29 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 715e2d9a9ab..92389571c13 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -23,7 +23,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
getAppliedRequiredSchema}
-import org.apache.hudi.avro.AvroSchemaUtils
+import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
@@ -159,14 +159,14 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
//always remove the row index column from the skeleton because the data
file will also have the same column
val skeletonProjection = projectRecord(skeletonRequiredSchema,
- AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema,
rowIndexColumn))
+ HoodieAvroUtils.removeFields(skeletonRequiredSchema, rowIndexColumn))
//If we need to do position based merging with log files we will leave
the row index column at the end
val dataProjection = if (getHasLogFiles &&
getShouldMergeUseRecordPosition) {
getIdentityProjection
} else {
projectRecord(dataRequiredSchema,
- AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema,
rowIndexColumn))
+ HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn))
}
//row index will always be the last column
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index da3ab4824b8..642624e0ed3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
@@ -236,7 +237,7 @@ public class AvroSchemaUtils {
isUnion = true;
foundSchema = resolveNullableSchema(foundSchema);
}
- Schema newSchema = Schema.createRecord(foundSchema.getName(),
foundSchema.getDoc(), foundSchema.getNamespace(), false,
Collections.singletonList(nestedPart.get()));
+ Schema newSchema = createNewSchemaFromFieldsWithReference(foundSchema,
Collections.singletonList(nestedPart.get()));
return Option.of(new Schema.Field(foundField.name(), isUnion ?
createNullableSchema(newSchema) : newSchema, foundField.doc(),
foundField.defaultVal()));
}
@@ -259,10 +260,7 @@ public class AvroSchemaUtils {
fields.add(new Schema.Field(f.name(), f.schema(), f.doc(),
f.defaultVal()));
}
}
-
- Schema newSchema = Schema.createRecord(a.getName(), a.getDoc(),
a.getNamespace(), a.isError());
- newSchema.setFields(fields);
- return newSchema;
+ return createNewSchemaFromFieldsWithReference(a, fields);
}
/**
@@ -292,17 +290,22 @@ public class AvroSchemaUtils {
fields.addAll(newFields);
}
- Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(),
schema.getNamespace(), schema.isError());
- newSchema.setFields(fields);
- return newSchema;
+ return createNewSchemaFromFieldsWithReference(schema, fields);
}
- public static Schema removeFieldsFromSchema(Schema schema, Set<String>
fieldNames) {
- List<Schema.Field> fields = schema.getFields().stream()
- .filter(field -> !fieldNames.contains(field.name()))
- .map(field -> new Schema.Field(field.name(), field.schema(),
field.doc(), field.defaultVal()))
- .collect(Collectors.toList());
+ /**
+ * Create a new schema but maintain all meta info from the old schema
+ *
+ * @param schema schema to get the meta info from
+ * @param fields list of fields in order that will be in the new schema
+ *
+ * @return schema with fields from fields, and metadata from schema
+ */
+ public static Schema createNewSchemaFromFieldsWithReference(Schema schema,
List<Schema.Field> fields) {
Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(),
schema.getNamespace(), schema.isError());
+ for (Map.Entry<String, Object> prop : schema.getObjectProps().entrySet()) {
+ newSchema.addProp(prop.getKey(), prop.getValue());
+ }
newSchema.setFields(fields);
return newSchema;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index be149a046bc..fb936468466 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -99,6 +99,7 @@ import java.util.stream.Collectors;
import static org.apache.avro.Schema.Type.ARRAY;
import static org.apache.avro.Schema.Type.MAP;
import static org.apache.avro.Schema.Type.UNION;
+import static
org.apache.hudi.avro.AvroSchemaUtils.createNewSchemaFromFieldsWithReference;
import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
@@ -338,13 +339,7 @@ public class HoodieAvroUtils {
parentFields.add(newField);
}
}
-
- Schema mergedSchema = Schema.createRecord(schema.getName(),
schema.getDoc(), schema.getNamespace(), false);
- for (Map.Entry<String, Object> prop : schema.getObjectProps().entrySet()) {
- mergedSchema.addProp(prop.getKey(), prop.getValue());
- }
- mergedSchema.setFields(parentFields);
- return mergedSchema;
+ return createNewSchemaFromFieldsWithReference(schema, parentFields);
}
public static boolean isSchemaNull(Schema schema) {
@@ -364,9 +359,8 @@ public class HoodieAvroUtils {
.filter(field -> !fieldsToRemove.contains(field.name()))
.map(field -> new Schema.Field(field.name(), field.schema(),
field.doc(), field.defaultVal()))
.collect(Collectors.toList());
- Schema filteredSchema = Schema.createRecord(schema.getName(),
schema.getDoc(), schema.getNamespace(), false);
- filteredSchema.setFields(filteredFields);
- return filteredSchema;
+
+ return createNewSchemaFromFieldsWithReference(schema, filteredFields);
}
public static String addMetadataColumnTypes(String hiveColumnTypes) {
@@ -385,9 +379,7 @@ public class HoodieAvroUtils {
}
})
.collect(Collectors.toList());
- Schema withNonNullField = Schema.createRecord(schema.getName(),
schema.getDoc(), schema.getNamespace(), false);
- withNonNullField.setFields(filteredFields);
- return withNonNullField;
+ return createNewSchemaFromFieldsWithReference(schema, filteredFields);
}
private static Schema initRecordKeySchema() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
index 1f29fc8894d..85d0cef2893 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
@@ -179,6 +179,9 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
fields.set(i, new Schema.Field(curr.name(), curr.schema(), curr.doc(),
curr.defaultVal()));
}
Schema newSchema = Schema.createRecord(dataSchema.getName(),
dataSchema.getDoc(), dataSchema.getNamespace(), dataSchema.isError());
+ for (Map.Entry<String, Object> prop :
dataSchema.getObjectProps().entrySet()) {
+ newSchema.addProp(prop.getKey(), prop.getValue());
+ }
newSchema.setFields(fields);
return newSchema;
}