This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch rc3-patched-for-test in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2a5f6168393f1d75f839bfab2a4636f06877a7f1 Author: Raymond Xu <[email protected]> AuthorDate: Thu Apr 21 18:15:54 2022 +0800 nested field patch apply from https://patch-diff.githubusercontent.com/raw/apache/hudi/pull/5379.patch --- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 32 +++++++++++++++------- .../org/apache/hudi/avro/TestHoodieAvroUtils.java | 7 +++++ .../scala/org/apache/hudi/HoodieBaseRelation.scala | 11 +++++--- .../hudi/functional/TestMORDataSourceStorage.scala | 28 ++++++++++++------- 4 files changed, 54 insertions(+), 24 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 41be0b00c0..47be7117a7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -18,6 +18,17 @@ package org.apache.hudi.avro; +import org.apache.hudi.common.config.SerializableSchema; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.SchemaCompatibilityException; + import org.apache.avro.AvroRuntimeException; import org.apache.avro.Conversions; import org.apache.avro.Conversions.DecimalConversion; @@ -42,16 +53,6 @@ import org.apache.avro.io.EncoderFactory; import org.apache.avro.io.JsonDecoder; import org.apache.avro.io.JsonEncoder; import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hudi.common.config.SerializableSchema; -import org.apache.hudi.common.model.HoodieOperation; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.SchemaCompatibilityException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -484,6 +485,17 @@ public class HoodieAvroUtils { return projectedSchema; } + /** + * Obtain the root-level field name of a full field name, possibly a nested field. + * For example, given "a.b.c", the output is "a"; given "a", the output is "a". + * + * @param fieldName The field name. + * @return Root-level field name + */ + public static String getRootLevelFieldName(String fieldName) { + return fieldName.split("\\.")[0]; + } + /** * Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c */ diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index 246d74411d..bd0254da3d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -257,6 +257,13 @@ public class TestHoodieAvroUtils { assertEquals(expectedSchema, rec1.getSchema()); } + @Test + public void testGetRootLevelFieldName() { + assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a.b.c")); + assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a")); + assertEquals("", HoodieAvroUtils.getRootLevelFieldName("")); + } + @Test public void testGetNestedFieldVal() { GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA)); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index f776d08ec9..aac57e1bbb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath} import org.apache.hudi.HoodieConversionUtils.toScalaOption +import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} @@ -39,10 +40,8 @@ import org.apache.hudi.io.storage.HoodieHFileReader import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.avro.HoodieAvroSchemaConverters import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection -import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils @@ -363,7 +362,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, } protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { - val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) + // For a nested field in mandatory columns, we should first get the root-level field, and then + // check for any missing column, as the requestedColumns should only contain root-level fields + // We should only append root-level field as well + val missing = mandatoryColumns.map(col => HoodieAvroUtils.getRootLevelFieldName(col)) + .filter(rootField => !requestedColumns.contains(rootField)) requestedColumns ++ missing } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala index 18b639f2f9..8cf6b4174c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala @@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} @@ -32,7 +33,7 @@ import org.apache.spark.sql.functions.{col, lit} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.Tag import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.CsvSource import scala.collection.JavaConversions._ @@ -57,19 +58,28 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { val updatedVerificationVal: String = "driver_update" @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testMergeOnReadStorage(isMetadataEnabled: Boolean) { - val dataGen = new HoodieTestDataGenerator() + @CsvSource(Array( + "true,", + "true,fare.currency", + "false,", + "false,fare.currency" + )) + def testMergeOnReadStorage(isMetadataEnabled: Boolean, preComineField: String) { + var options: Map[String, String] = commonOpts + + (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled)) + if (!StringUtils.isNullOrEmpty(preComineField)) { + options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> preComineField) + } + val dataGen = new HoodieTestDataGenerator(0xDEEF) val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) // Bulk Insert Operation val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") - .options(commonOpts) + .options(options) .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) .mode(SaveMode.Overwrite) .save(basePath) @@ -90,8 +100,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .options(options) .mode(SaveMode.Append) .save(basePath) @@ -110,8 +119,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) inputDF3.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .options(options) .mode(SaveMode.Append) .save(basePath)
