This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 0b9eaf31cf5 [HUDI-7874] Ensure Parquet can interoperate different
level structures (#11461)
0b9eaf31cf5 is described below
commit 0b9eaf31cf56f30528ae557f205d43a784ccde0e
Author: Vitali Makarevich <[email protected]>
AuthorDate: Wed Jun 19 20:28:46 2024 +0200
[HUDI-7874] Ensure Parquet can interoperate different level structures
(#11461)
Co-authored-by: vmakarevich <[email protected]>
---
.../apache/parquet/avro/HoodieAvroReadSupport.java | 23 +-
.../hudi/TestParquetReaderCompatibility.scala | 341 +++++++++++++++++++++
2 files changed, 359 insertions(+), 5 deletions(-)
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java
index 326accb66b2..cd39dcf6388 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java
@@ -46,11 +46,7 @@ public class HoodieAvroReadSupport<T> extends
AvroReadSupport<T> {
@Override
public ReadContext init(Configuration configuration, Map<String, String>
keyValueMetaData, MessageType fileSchema) {
boolean legacyMode = checkLegacyMode(fileSchema.getFields());
- // support non-legacy list
- if (!legacyMode &&
configuration.get(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE) == null) {
- configuration.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
- "false", "support reading avro from non-legacy map/list in parquet
file");
- }
+ adjustConfToReadWithFileProduceMode(legacyMode, configuration);
ReadContext readContext = super.init(configuration, keyValueMetaData,
fileSchema);
MessageType requestedSchema = readContext.getRequestedSchema();
// support non-legacy map. Convert non-legacy map to legacy map
@@ -62,6 +58,23 @@ public class HoodieAvroReadSupport<T> extends
AvroReadSupport<T> {
return new ReadContext(requestedSchema,
readContext.getReadSupportMetadata());
}
+ /**
+ * Here we want set config with which file has been written.
+ * Even though user may have overwritten {@link
AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE},
+ * it's only applicable to how to produce new files(here is a read path).
+ * Later the config value {@link AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE}
will still be used
+ * to write new file according to the user preferences.
+ **/
+ private void adjustConfToReadWithFileProduceMode(Boolean
isLegacyModeWrittenFile, Configuration configuration) {
+ if (isLegacyModeWrittenFile) {
+ configuration.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
+ "true", "support reading avro from legacy map/list in parquet file");
+ } else {
+ configuration.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
+ "false", "support reading avro from non-legacy map/list in parquet
file");
+ }
+ }
+
/**
* Check whether write map/list with legacy mode.
* legacy:
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestParquetReaderCompatibility.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestParquetReaderCompatibility.scala
new file mode 100644
index 00000000000..33f1fe680d4
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestParquetReaderCompatibility.scala
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.conf.Configuration
+import
org.apache.hudi.TestParquetReaderCompatibility.NullabilityEnum.{NotNullable,
Nullability, Nullable}
+import org.apache.hudi.TestParquetReaderCompatibility.{SparkSetting,
TestScenario, ThreeLevel, TwoLevel}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.table.ParquetTableSchemaResolver
+import org.apache.hudi.common.testutils.HoodieTestUtils
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.io.storage.HoodieIOFactory
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.testutils.HoodieClientTestUtils
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.types.{ArrayType, LongType, StringType,
StructField, StructType}
+import org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER
+import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration,
HoodieHadoopStorage}
+import org.apache.parquet.schema.OriginalType
+import org.apache.spark.SparkConf
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Collections
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object TestParquetReaderCompatibility {
+ val listFieldName = "internal_list"
+ abstract class SparkSetting {
+ def value: String
+ def overrideConf(conf: SparkConf): Unit
+ }
+
+ // Explicitly set 2 level
+ case object TwoLevel extends SparkSetting {
+ val value: String = "TwoLevel"
+ def overrideConf(conf: SparkConf): Unit = {
+ conf.set("spark.hadoop.parquet.avro.write-old-list-structure",
true.toString)
+ }
+ }
+
+ // Explicitly set 3 level
+ case object ThreeLevel extends SparkSetting {
+ val value: String = "ThreeLevel"
+
+ def overrideConf(conf: SparkConf): Unit = {
+ conf.set("spark.hadoop.parquet.avro.write-old-list-structure",
false.toString)
+ }
+ }
+
+ // Set nothing(likely most users do so) - default is 2 level inside Avro
code.
+ case object Default extends SparkSetting {
+ def value: String = "TwoLevel"
+
+ def overrideConf(conf: SparkConf): Unit = {}
+ }
+
+ val cases: Seq[SparkSetting] = Seq(TwoLevel, ThreeLevel, Default)
+
+ object NullabilityEnum extends Enumeration {
+ type Nullability = Value
+ val Nullable: NullabilityEnum.Value = Value("Nullable")
+ val NotNullable: NullabilityEnum.Value = Value("NotNullable")
+ }
+
+ case class TestScenario(initialLevel: SparkSetting, listNullability:
NullabilityEnum.Nullability, targetLevel: SparkSetting, itemsNullability:
NullabilityEnum.Nullability)
+
+ /**
+ * Here we are generating all possible combinations of settings, including
default.
+ **/
+ def allPossibleCombinations: java.util.stream.Stream[TestScenario] = {
+ val allPossibleCombinations = for (
+ initialLevel <- cases;
+ listNullability <- NullabilityEnum.values.toSeq;
+ targetLevel <- cases;
+ itemsNullability <- NullabilityEnum.values.toSeq
+ ) yield TestScenario(initialLevel, listNullability, targetLevel,
itemsNullability)
+ allPossibleCombinations.filter {
+ case c => {
+ val notAllowedNulls = Seq(TwoLevel, Default)
+ // It's not allowed to have NULLs inside lists for 2 level lists(this
matches explicit setting or default).
+ !(c.itemsNullability == Nullable &&
(notAllowedNulls.contains(c.targetLevel) ||
notAllowedNulls.contains(c.initialLevel)))
+ }
+ }.asJava.stream()
+ }
+
+ def selectedCombinations: java.util.stream.Stream[TestScenario] = {
+ Seq(
+ // This scenario leads to silent dataloss mentioned here -
https://github.com/apache/hudi/pull/11450 - basically all arrays
+ // which are not updated in the incoming batch are set to null.
+ TestScenario(initialLevel = TwoLevel, listNullability = Nullable,
targetLevel = ThreeLevel, itemsNullability = NotNullable),
+ // This scenario leads to exception mentioned here
https://github.com/apache/hudi/pull/11450 - the only difference with silent
dataloss
+ // is that writer does not allow wrongly-read null to be written into
new file, so write fails.
+ TestScenario(initialLevel = TwoLevel, listNullability = NotNullable,
targetLevel = ThreeLevel, itemsNullability = NotNullable),
+ // This is reverse version of scenario TwoLevel -> ThreeLevel with
nullable list value - leads to silent data loss.
+ TestScenario(initialLevel = ThreeLevel, listNullability = Nullable,
targetLevel = TwoLevel, itemsNullability = NotNullable),
+ // This is reverse version of scenario TwoLevel -> ThreeLevel with not
nullable list value - leads to exception.
+ TestScenario(initialLevel = ThreeLevel, listNullability = NotNullable,
targetLevel = TwoLevel, itemsNullability = NotNullable)
+ ).asJava.stream()
+ }
+ def testSource: java.util.stream.Stream[TestScenario] = if(runAllPossible) {
+ allPossibleCombinations
+ } else {
+ selectedCombinations
+ }
+
+ /**
+ * Change the value to run on highlighted ones.
+ **/
+ def runAllPossible = true
+}
+
+/**
+ * Ensure after switch from reading file with schema with which file was
written to deduced schema(RFC 46)
+ * different list levels can interoperate.
+ **/
+class TestParquetReaderCompatibility extends HoodieSparkWriterTestBase {
+ /*
+ * Generate schema with required nullability constraints.
+ * The interesting part is that if list is the last element in the schema -
different errors will be thrown.
+ **/
+ private def getSchemaWithParameters(listNullability: Nullability,
listElementNullability: Nullability): StructType = {
+ val listNullable = listNullability == Nullable
+ val listElementsNullable = listElementNullability == Nullable
+ val schema = StructType(Array(
+ StructField("key", LongType, nullable = false),
+ StructField("partition", StringType, nullable = false),
+ StructField(TestParquetReaderCompatibility.listFieldName,
ArrayType(LongType, listElementsNullable), listNullable),
+ StructField("ts", LongType, nullable = false)
+ ))
+ schema
+ }
+ private def defaultPartition = "p1"
+
+ private def generateRowsWithParameters(listNullability: Nullability,
listElementNullability: Nullability, combineValue: Long = 1L, dummyCount: Int =
10): Map[Long, Row] = {
+ val listNullable = listNullability == Nullable
+ val listElementsNullable = listElementNullability == Nullable
+ val res = mutable.Map[Long, Row]()
+ var key = 1L
+ for (_ <- 1 to dummyCount) {
+ res += key -> Row(key, defaultPartition, Seq(100L), combineValue)
+ key += 1
+ }
+ res += key -> Row(key, defaultPartition, Seq(1L, 2L), combineValue)
+ key += 1
+ if (listNullable) {
+ res += key -> Row(key, defaultPartition, null, combineValue)
+ key += 1
+ }
+ if (listElementsNullable) {
+ res += key -> Row(key, defaultPartition, Seq(1L, null), combineValue)
+ key += 1
+ }
+ res.toMap
+ }
+
+ private def createSparkSessionWithListLevel(listType: SparkSetting):
SparkSession = {
+ val conf = new SparkConf()
+ listType.overrideConf(conf)
+ val spark = SparkSession.builder()
+ .config(HoodieClientTestUtils.getSparkConfForTest("hoodie_test"))
+ .config(conf)
+ .getOrCreate()
+ spark
+ }
+
+ /**
+ * Test interoperability of different parquet list types and their
nullability.
+ **/
+ @ParameterizedTest
+ @MethodSource(Array("testSource"))
+ def testAvroListUpdate(input: TestScenario): Unit = {
+ spark.stop()
+ val path = tempBasePath + "_avro_list_update"
+ val options = Map(
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "key",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+ "path" -> path
+ )
+ val initialLevel = input.initialLevel
+ val listNullability = input.listNullability
+ val targetLevel = input.targetLevel
+ val itemsNullability = input.itemsNullability
+ val structType = getSchemaWithParameters(listNullability, itemsNullability)
+ val initialRecords = generateRowsWithParameters(listNullability,
itemsNullability)
+
+ val firstWriteSession = createSparkSessionWithListLevel(initialLevel)
+ try {
+ HoodieSparkSqlWriter.write(
+ firstWriteSession.sqlContext,
+ SaveMode.Overwrite,
+ options,
+
firstWriteSession.createDataFrame(firstWriteSession.sparkContext.parallelize(initialRecords.values.toSeq),
structType)
+ )
+
+ val firstWriteLevels = getListLevelsFromPath(firstWriteSession, path)
+ assert(firstWriteLevels.size == 1, s"Expected only one level, got
$firstWriteLevels")
+ assert(firstWriteLevels.head == initialLevel.value, s"Expected level
$initialLevel, got $firstWriteLevels")
+ } finally {
+ firstWriteSession.close()
+ }
+
+ val updateRecords = generateRowsWithParameters(listNullability,
itemsNullability, 2L, 1)
+ val secondWriteSession = createSparkSessionWithListLevel(targetLevel)
+ var expectedRecordsWithSchema: Seq[Row] = Seq()
+ try {
+ HoodieSparkSqlWriter.write(
+ secondWriteSession.sqlContext,
+ SaveMode.Append,
+ options,
+
secondWriteSession.createDataFrame(secondWriteSession.sparkContext.parallelize(updateRecords.values.toSeq),
structType)
+ )
+ val secondWriteLevels = getListLevelsFromPath(secondWriteSession, path)
+ assert(secondWriteLevels.size == 1, s"Expected only one level, got
$secondWriteLevels")
+ assert(secondWriteLevels.head == targetLevel.value, s"Expected level
$targetLevel, got $secondWriteLevels")
+
+ val expectedRecords = (initialRecords ++ updateRecords).values.toSeq
+ expectedRecordsWithSchema = dropMetaFields(
+
secondWriteSession.createDataFrame(secondWriteSession.sparkContext.parallelize(expectedRecords),
structType)
+ ).collect().toSeq
+ } finally {
+ secondWriteSession.close()
+ }
+
+ val readSessionWithInitLevel =
createSparkSessionWithListLevel(initialLevel)
+ try {
+ compareResults(expectedRecordsWithSchema, readSessionWithInitLevel, path)
+ } finally {
+ readSessionWithInitLevel.close()
+ }
+
+ val readSessionWithTargetLevel =
createSparkSessionWithListLevel(targetLevel)
+ try {
+ compareResults(expectedRecordsWithSchema, readSessionWithTargetLevel,
path)
+ } finally {
+ readSessionWithTargetLevel.close()
+ }
+
+ initSparkContext()
+ }
+
+ /**
+ * For some reason order of fields is different,
+ * so produces difference like
+ * Difference: Expected [2,p1,WrappedArray(1, 2),2], got [2,WrappedArray(1,
2),2,p1]
+ * Difference: Expected [3,p1,WrappedArray(1, null),2], got
[3,WrappedArray(1, null),2,p1]
+ * So using manual comparison by ensuring length is the same, then
extracting fields by names and comparing them.
+ * This will not work for nested structs, but it's a simple test.
+ */
+ def compareIndividualRows(first: Row, second: Row): Boolean = {
+ if (first.length != second.length) {
+ false
+ } else {
+ first.schema.fieldNames.forall { field =>
+ val firstIndex = first.fieldIndex(field)
+ val secondIndex = second.fieldIndex(field)
+ first.get(firstIndex) == second.get(secondIndex)
+ }
+ }
+ }
+
+ private def compareResults(expectedRecords: Seq[Row], sparkSession:
SparkSession, path: String): Unit = {
+ implicit object RowOrdering extends Ordering[Row] {
+ def compare(a: Row, b: Row): Int = {
+ val firstId = a.getLong(a.fieldIndex("key"))
+ val secondId = b.getLong(b.fieldIndex("key"))
+ firstId.compareTo(secondId)
+ }
+ }
+ val expectedSorted = expectedRecords.sorted
+ val readRecords =
dropMetaFields(sparkSession.read.format("hudi").load(path)).collect().toSeq.sorted
+ assert(readRecords.length == expectedSorted.length, s"Expected
${expectedSorted.length} records, got ${readRecords.length}")
+ val recordsEqual = readRecords.zip(expectedSorted).forall {
+ case (first, second) => compareIndividualRows(first, second)
+ }
+ val explanationStr = if (!recordsEqual) {
+ readRecords.zipWithIndex.map {
+ case (row, index) => {
+ val expectedRow = expectedSorted(index)
+ if (row != expectedRow) {
+ s"Difference: Expected $expectedRow, got $row"
+ } else {
+ s"Equals: expected $expectedRow, got $row"
+ }
+ }
+ }.mkString("\n")
+ } else {
+ ""
+ }
+ assert(recordsEqual, explanationStr)
+ }
+
+ private def getListLevelsFromPath(spark: SparkSession, path: String):
Set[String] = {
+ val engineContext = new HoodieSparkEngineContext(spark.sparkContext,
spark.sqlContext)
+ val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build()
+ val baseTableMetadata = new HoodieBackedTableMetadata(
+ engineContext, HoodieTestUtils.getDefaultStorage, metadataConfig,
s"$path", false)
+ val fileStatuses =
baseTableMetadata.getAllFilesInPartitions(Collections.singletonList(s"$path/$defaultPartition"))
+
+ fileStatuses.asScala.flatMap(_._2.asScala).map(_.getPath).map(path =>
getListType(spark.sparkContext.hadoopConfiguration, path)).toSet
+ }
+
+ private def getListType(hadoopConf: Configuration, path: StoragePath):
String = {
+ val reader = HoodieIOFactory.getIOFactory(new HoodieHadoopStorage(path,
new
HadoopStorageConfiguration(hadoopConf))).getReaderFactory(HoodieRecordType.AVRO).getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER,
path)
+ val schema =
ParquetTableSchemaResolver.convertAvroSchemaToParquet(reader.getSchema,
hadoopConf)
+
+ val list = schema.getFields.asScala.find(_.getName ==
TestParquetReaderCompatibility.listFieldName).get
+ val groupType = list.asGroupType()
+ val originalType = groupType.getOriginalType
+ val isThreeLevel = originalType == OriginalType.LIST &&
!(groupType.getType(0).getName == "array")
+
+ if (isThreeLevel) {
+ ThreeLevel.value
+ } else {
+ TwoLevel.value
+ }
+ }
+
+}