This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9f8d4d0130d [HUDI-6072] Fix NPE when upsert merger and null map or
array (#8432)
9f8d4d0130d is described below
commit 9f8d4d0130dbe78598f24f00e7fa75c13737fc79
Author: Nicolas Paris <[email protected]>
AuthorDate: Fri Apr 28 07:23:46 2023 +0200
[HUDI-6072] Fix NPE when upsert merger and null map or array (#8432)
Co-authored-by: Danny Chan <[email protected]>
---
.../apache/spark/sql/HoodieInternalRowUtils.scala | 5 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 101 ++++++++++++++-------
2 files changed, 73 insertions(+), 33 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index 3ea801177fb..b56b0b1e4ce 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -188,7 +188,10 @@ object HoodieInternalRowUtils {
null
}
- fieldWriters(pos)(fieldUpdater, pos, prevValue)
+ if(prevValue == null)
+ fieldUpdater.setNullAt(pos)
+ else
+ fieldWriters(pos)(fieldUpdater, pos, prevValue)
pos += 1
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index ae1f62b7e61..6b1773807fe 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -60,6 +60,8 @@ import java.sql.{Date, Timestamp}
import java.util.function.Consumer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
+import org.junit.jupiter.api.Assertions.assertDoesNotThrow
+import org.junit.jupiter.api.function.Executable
/**
@@ -151,22 +153,22 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
@Test
def testInferPartitionBy(): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO,
Map())
- // Insert Operation
- val records = recordsToStrings(dataGen.generateInserts("000",
100)).toList
- val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
- val commonOptsNoPreCombine = Map(
- "hoodie.insert.shuffle.parallelism" -> "4",
- "hoodie.upsert.shuffle.parallelism" -> "4",
- DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
- HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
- ) ++ writeOpts
+ val commonOptsNoPreCombine = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+ ) ++ writeOpts
- inputDF.write.partitionBy("partition").format("hudi")
- .options(commonOptsNoPreCombine)
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .mode(SaveMode.Overwrite)
- .save(basePath)
+ inputDF.write.partitionBy("partition").format("hudi")
+ .options(commonOptsNoPreCombine)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
val snapshot0 =
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
snapshot0.cache()
@@ -195,10 +197,10 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
val records2 = recordsToStrings(dataGen.generateInserts("000", 200)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
// hard code the value for rider and fare so that we can verify the
partitions paths with hudi
- val toInsertDf =
inputDF1.withColumn("fare",lit(100)).withColumn("rider",lit("rider-123"))
-
.union(inputDF2.withColumn("fare",lit(200)).withColumn("rider",lit("rider-456")))
+ val toInsertDf = inputDF1.withColumn("fare", lit(100)).withColumn("rider",
lit("rider-123"))
+ .union(inputDF2.withColumn("fare", lit(200)).withColumn("rider",
lit("rider-456")))
- toInsertDf.write.partitionBy("fare","rider").format("hudi")
+ toInsertDf.write.partitionBy("fare", "rider").format("hudi")
.options(commonOptsNoPreCombine)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
@@ -217,7 +219,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
assertEquals(snapshot1.filter("_hoodie_partition_path =
'200/rider-456'").count(), 200)
// triggering 2nd batch to ensure table config validation does not fail.
- toInsertDf.write.partitionBy("fare","rider").format("hudi")
+ toInsertDf.write.partitionBy("fare", "rider").format("hudi")
.options(commonOptsNoPreCombine)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
@@ -346,7 +348,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
}
- private def writeToHudi(opts: Map[String, String], df: Dataset[Row]) : Unit
= {
+ private def writeToHudi(opts: Map[String, String], df: Dataset[Row]): Unit =
{
df.write.format("hudi")
.options(opts)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
@@ -584,7 +586,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
.save(basePath)
val metaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
- .setLoadActiveTimelineOnLoad(true).build();
+ .setLoadActiveTimelineOnLoad(true).build();
val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
assertEquals(2, commits.size)
@@ -663,7 +665,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
.save(basePath)
val metaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
- .setLoadActiveTimelineOnLoad(true).build()
+ .setLoadActiveTimelineOnLoad(true).build()
val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
assertEquals(2, commits.size)
@@ -722,7 +724,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
assertEquals(7, filterSecondPartitionCount)
val metaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
- .setLoadActiveTimelineOnLoad(true).build()
+ .setLoadActiveTimelineOnLoad(true).build()
val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
assertEquals(3, commits.size)
@@ -777,7 +779,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
assertEquals(7, filterSecondPartitionCount)
val metaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
- .setLoadActiveTimelineOnLoad(true).build()
+ .setLoadActiveTimelineOnLoad(true).build()
val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
assertEquals(2, commits.size)
@@ -795,7 +797,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
val insert2NewKeyCnt = 2
val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt
- val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate)
+ val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate)
val inserts1 = allRecords.subList(0, insert1Cnt)
val inserts2New = dataGen.generateSameKeyInserts("002",
allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt))
val inserts2Dup = dataGen.generateSameKeyInserts("002",
inserts1.subList(0, insert2DupKeyCnt))
@@ -847,7 +849,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
Row("22", "lisi", Timestamp.valueOf("1970-01-02 13:31:24"),
Date.valueOf("1991-11-08"), BigDecimal.valueOf(2.0), 11, 1),
Row("33", "zhangsan", Timestamp.valueOf("1970-01-03 13:31:24"),
Date.valueOf("1991-11-09"), BigDecimal.valueOf(3.0), 11, 1))
val rdd = jsc.parallelize(records)
- val recordsDF = spark.createDataFrame(rdd, schema)
+ val recordsDF = spark.createDataFrame(rdd, schema)
recordsDF.write.format("org.apache.hudi")
.options(writeOpts)
.mode(SaveMode.Overwrite)
@@ -1392,7 +1394,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
val (writeOpts, _) = getWriterReaderOpts(recordType,
getQuickstartWriteConfigs.asScala.toMap)
val dataGenerator = new QuickstartUtils.DataGenerator()
- val records = convertToStringList(dataGenerator.generateInserts( 10))
+ val records = convertToStringList(dataGenerator.generateInserts(10))
val recordsRDD = spark.sparkContext.parallelize(records, 2)
val inputDF =
spark.read.json(sparkSession.createDataset(recordsRDD)(Encoders.STRING))
inputDF.write.format("hudi")
@@ -1411,6 +1413,41 @@ class TestCOWDataSource extends
HoodieSparkClientTestBase with ScalaAssertionSup
assertEquals(false, Metrics.isInitialized(basePath), "Metrics should be
shutdown")
}
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
+ def testMapArrayTypeSchemaEvolution(recordType: HoodieRecordType): Unit = {
+ assertDoesNotThrow(
+ new Executable {
+ override def execute(): Unit = {
+ val (writeOpts, _) = getWriterReaderOpts(recordType,
getQuickstartWriteConfigs.asScala.toMap)
+
+ val schema1 = StructType(
+ StructField("_row_key", StringType, nullable = false) ::
+ StructField("name", MapType(StringType,
+ ArrayType(StringType, containsNull = false)), nullable = true) ::
+ StructField("timestamp", LongType, nullable = true) ::
+ StructField("partition", LongType, nullable = true) :: Nil)
+ val records = List(Row("1", null, 1L, 1L))
+ val inputDF =
spark.createDataFrame(spark.sparkContext.parallelize(records, 2), schema1)
+ inputDF.write.format("org.apache.hudi")
+ .options(commonOpts ++ writeOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val schema2 = StructType(StructField("_row_key", StringType, nullable =
false) ::
+ StructField("name", MapType(StringType, ArrayType(StringType,
+ containsNull = true)), nullable = true) ::
+ StructField("timestamp", LongType, nullable = true) ::
+ StructField("partition", LongType, nullable = true) :: Nil)
+ val records2 = List(Row("1", null, 1L, 1L))
+ val inputDF2 =
spark.createDataFrame(spark.sparkContext.parallelize(records2, 2), schema2)
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts ++ writeOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ }})
+ }
+
/**
* Validates that clustering dag is triggered only once.
* We leverage spark event listener to validate it.
@@ -1421,7 +1458,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
val sm = new
StageEventManager("org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering")
spark.sparkContext.addSparkListener(sm)
- var structType : StructType = null
+ var structType: StructType = null
for (i <- 1 to 2) {
val records = recordsToStrings(dataGen.generateInserts("%05d".format(i),
100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
@@ -1429,7 +1466,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
inputDF.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
- .option("hoodie.metadata.enable","false")
+ .option("hoodie.metadata.enable", "false")
.mode(if (i == 0) SaveMode.Overwrite else SaveMode.Append)
.save(basePath)
}
@@ -1444,7 +1481,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
.option("hoodie.parquet.small.file.limit", "0")
.option("hoodie.clustering.inline", "true")
.option("hoodie.clustering.inline.max.commits", "2")
- .option("hoodie.metadata.enable","false")
+ .option("hoodie.metadata.enable", "false")
.mode(SaveMode.Append)
.save(basePath)
@@ -1466,8 +1503,8 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
def getWriterReaderOptsLessPartitionPath(recordType: HoodieRecordType,
- opt: Map[String, String] = commonOpts,
- enableFileIndex: Boolean =
DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):
+ opt: Map[String, String] =
commonOpts,
+ enableFileIndex: Boolean =
DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):
(Map[String, String], Map[String, String]) = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType, opt,
enableFileIndex)
(writeOpts.-(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()), readOpts)
@@ -1483,7 +1520,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
- /************** Stage Event Listener **************/
+ /** ************ Stage Event Listener ************* */
class StageEventManager(eventToTrack: String) extends SparkListener() {
var triggerCount = 0