This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch rc3-patched-for-test in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 3b8e782407ee036c859105ac8bf8518c73cd00f7 Author: xiarixiaoyao <[email protected]> AuthorDate: Thu Apr 21 11:07:50 2022 +0800 [address comments and add more test] --- .../java/org/apache/hudi/io/HoodieWriteHandle.java | 8 +-- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 66 +++++++++++++++------- .../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 31 ++++------ 3 files changed, 62 insertions(+), 43 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index ab8a3d7033..5d5760961a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -102,7 +102,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O> protected final String writeToken; protected final TaskContextSupplier taskContextSupplier; // For full schema evolution - protected final boolean schemaOnReadEnable; + protected final boolean schemaOnReadEnabled; public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) { @@ -125,7 +125,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O> !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction()); this.taskContextSupplier = taskContextSupplier; this.writeToken = makeWriteToken(); - schemaOnReadEnable = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema()); + schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema()); } /** @@ -230,12 +230,12 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O> * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields. */ protected GenericRecord rewriteRecord(GenericRecord record) { - return schemaOnReadEnable ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>()) + return schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>()) : HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields); } protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) { - return schemaOnReadEnable ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName) + return schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName) : HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName); } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 37d84a2895..47be7117a7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -65,13 +65,14 @@ import java.sql.Date; import java.sql.Timestamp; import java.time.LocalDate; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Deque; +import java.util.LinkedList; import java.util.TimeZone; import java.util.stream.Collectors; @@ -748,11 +749,24 @@ public class HoodieAvroUtils { * @return newRecord for new Schema */ public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols) { - Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols); + Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>()); return (GenericData.Record) newRecord; } - private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) { + /** + * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema. + * support deep rewrite for nested record and adjust rename operation. + * This particular method does the following things : + * a) Create a new empty GenericRecord with the new schema. + * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema + * + * @param oldRecord oldRecord to be rewritten + * @param newSchema newSchema used to rewrite oldRecord + * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema) + * @param fieldNames track the full name of visited field when we travel new schema. + * @return newRecord for new Schema + */ + private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) { if (oldRecord == null) { return null; } @@ -767,23 +781,23 @@ public class HoodieAvroUtils { for (int i = 0; i < fields.size(); i++) { Schema.Field field = fields.get(i); + String fieldName = field.name(); + fieldNames.push(fieldName); if (oldSchema.getField(field.name()) != null) { Schema.Field oldField = oldSchema.getField(field.name()); - helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols)); - } - // deal with rename - if (!renameCols.isEmpty() && oldSchema.getField(field.name()) == null) { - String fieldName = field.name(); - for (Map.Entry<String, String> entry : renameCols.entrySet()) { - List<String> nameParts = Arrays.asList(entry.getKey().split("\\.")); - List<String> namePartsOld = Arrays.asList(entry.getValue().split("\\.")); - if (nameParts.get(nameParts.size() - 1).equals(fieldName) && oldSchema.getField(namePartsOld.get(namePartsOld.size() - 1)) != null) { - // find rename - Schema.Field oldField = oldSchema.getField(namePartsOld.get(namePartsOld.size() - 1)); - helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols)); - } + helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames)); + } else { + String fieldFullName = createFullName(fieldNames); + String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\."); + String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1]; + // deal with rename + if (oldSchema.getField(field.name()) == null && oldSchema.getField(lastColNameFromOldSchema) != null) { + // find rename + Schema.Field oldField = oldSchema.getField(lastColNameFromOldSchema); + helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames)); } } + fieldNames.pop(); } GenericData.Record newRecord = new GenericData.Record(newSchema); for (int i = 0; i < fields.size(); i++) { @@ -804,9 +818,11 @@ public class HoodieAvroUtils { } Collection array = (Collection)oldRecord; List<Object> newArray = new ArrayList(); + fieldNames.push("element"); for (Object element : array) { - newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols)); + newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames)); } + fieldNames.pop(); return newArray; case MAP: if (!(oldRecord instanceof Map)) { @@ -814,17 +830,29 @@ public class HoodieAvroUtils { } Map<Object, Object> map = (Map<Object, Object>) oldRecord; Map<Object, Object> newMap = new HashMap<>(); + fieldNames.push("value"); for (Map.Entry<Object, Object> entry : map.entrySet()) { - newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols)); + newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames)); } + fieldNames.pop(); return newMap; case UNION: - return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols); + return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames); default: return rewritePrimaryType(oldRecord, oldSchema, newSchema); } } + private static String createFullName(Deque<String> fieldNames) { + String result = ""; + if (!fieldNames.isEmpty()) { + List<String> parentNames = new ArrayList<>(); + fieldNames.descendingIterator().forEachRemaining(parentNames::add); + result = parentNames.stream().collect(Collectors.joining(".")); + } + return result; + } + private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Schema newSchema) { Schema realOldSchema = oldSchema; if (realOldSchema.getType() == UNION) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index ae828ed9f7..5416363598 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -445,28 +445,19 @@ class TestSpark3DDL extends TestHoodieSqlBase { Seq(null), Seq(Map("t1" -> 10.0d)) ) + spark.sql(s"alter table ${tableName} rename column members to mem") + spark.sql(s"alter table ${tableName} rename column mem.value.n to nn") + spark.sql(s"alter table ${tableName} rename column userx to us") + spark.sql(s"alter table ${tableName} rename column us.age to age1") + + spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))") + spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").show() + checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").collect())( + Seq(null, 29), + Seq(null, 291) + ) } } } } - - private def performClustering(writeDf: DataFrame, basePath: String, tableName: String, tableType: String): Unit = { - writeDf.write.format("org.apache.hudi") - .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType) - .option("hoodie.upsert.shuffle.parallelism", "1") - .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id") - .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "comb") - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "par") - .option(HoodieWriteConfig.TBL_NAME.key, tableName) - .option("hoodie.schema.on.read.enable", "true") - // option for clustering - .option("hoodie.clustering.inline", "true") - .option("hoodie.clustering.inline.max.commits", "1") - .option("hoodie.clustering.plan.strategy.small.file.limit", String.valueOf(2*1024*1024L)) - .option("hoodie.clustering.plan.strategy.max.bytes.per.group", String.valueOf(10*1024*1024L)) - .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(4 * 1024* 1024L)) - .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "col1, col2") - .mode(SaveMode.Append) - .save(basePath) - } }
