This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f8cafbbba1e [HUDI-6412] Handle record index payload update (#9036)
f8cafbbba1e is described below
commit f8cafbbba1ea8543b90eec559f968ced53b463ed
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Jun 23 12:49:11 2023 +0530
[HUDI-6412] Handle record index payload update (#9036)
---
.../functional/TestHoodieBackedMetadata.java | 4 +
.../hudi/metadata/HoodieMetadataPayload.java | 11 +-
.../hudi/functional/TestMetadataRecordIndex.scala | 212 +++++++++++++++++++++
3 files changed, 217 insertions(+), 10 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 647ae107f4c..b50c002468f 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1441,6 +1441,10 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
.withEnableRecordIndex(true)
.withRecordIndexFileGroupCount(5, 5)
.build())
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withInlineClustering(true)
+ .withInlineClusteringNumCommits(2)
+ .build())
.build();
init(COPY_ON_WRITE, writeConfig);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index eb8794a64c7..888833b45a0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -419,16 +419,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
case METADATA_TYPE_COLUMN_STATS:
return new HoodieMetadataPayload(key,
combineColumnStatsMetadata(previousRecord));
case METADATA_TYPE_RECORD_INDEX:
- // TODO: does not work with updates
- if (previousRecord.recordIndexMetadata.getInstantTime() !=
recordIndexMetadata.getInstantTime()) {
- throw new HoodieMetadataException(String.format("InstantTime for %s
should not change from %s to %s", previousRecord.key,
- previousRecord, this));
- }
- // TODO: This does not work with clustering
- if
(!previousRecord.getRecordGlobalLocation().equals(getRecordGlobalLocation())) {
- throw new HoodieMetadataException(String.format("Location for %s
should not change from %s to %s", previousRecord.key,
- previousRecord, this));
- }
+ // No need to merge with previous record index, always pick the latest
payload.
return this;
default:
throw new HoodieMetadataException("Unknown type of
HoodieMetadataPayload: " + type);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
new file mode 100644
index 00000000000..0f716e18951
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.spark.sql._
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.Properties
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+@Tag("functional")
+class TestMetadataRecordIndex extends HoodieSparkClientTestBase {
+ var spark: SparkSession = _
+ var instantTime: AtomicInteger = _
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true"
+ )
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ RECORDKEY_FIELD.key -> "_row_key",
+ PARTITIONPATH_FIELD.key -> "partition",
+ PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ ) ++ metadataOpts
+ var mergedDfList: List[DataFrame] = List.empty
+
+ @BeforeEach
+ override def setUp() {
+ initPath()
+ initSparkContexts()
+ initFileSystem()
+ initTestDataGenerator()
+
+ setTableName("hoodie_test")
+ initMetaClient()
+
+ instantTime = new AtomicInteger(1)
+
+ spark = sqlContext.sparkSession
+ }
+
+ @AfterEach
+ override def tearDown() = {
+ cleanupFileSystem()
+ cleanupSparkContexts()
+ }
+
+ @Test
+ def testClusteringWithRecordIndex(): Unit = {
+ val hudiOpts = commonOpts ++ Map(
+ TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.name(),
+ HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
+ HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2"
+ )
+
+ doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+ doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+
+ val lastClusteringInstant = getLatestClusteringInstant()
+ assertTrue(lastClusteringInstant.isPresent)
+
+ doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+ doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+
+
assertTrue(getLatestClusteringInstant().get().getTimestamp.compareTo(lastClusteringInstant.get().getTimestamp)
> 0)
+ validateDataAndRecordIndices(hudiOpts)
+ }
+
+ private def getLatestClusteringInstant():
org.apache.hudi.common.util.Option[HoodieInstant] = {
+ metaClient.getActiveTimeline.getCompletedReplaceTimeline.lastInstant()
+ }
+
+ private def doWriteAndValidateDataAndRecordIndex(hudiOpts: Map[String,
String],
+ operation: String,
+ saveMode: SaveMode,
+ validate: Boolean = true):
DataFrame = {
+ var records1: mutable.Buffer[String] = null
+ if (operation == UPSERT_OPERATION_OPT_VAL) {
+ val instantTime = getInstantTime()
+ val records =
recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 20))
+ records.addAll(recordsToStrings(dataGen.generateInserts(instantTime,
20)))
+ records1 = records.asScala
+ } else {
+ records1 = recordsToStrings(dataGen.generateInserts(getInstantTime(),
100)).asScala
+ }
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(hudiOpts)
+ .option(OPERATION.key, operation)
+ .mode(saveMode)
+ .save(basePath)
+ calculateMergedDf(inputDF1)
+ if (validate) {
+ validateDataAndRecordIndices(hudiOpts)
+ }
+ inputDF1
+ }
+
+ def calculateMergedDf(inputDF1: DataFrame): Unit = {
+ val prevDfOpt = mergedDfList.lastOption
+ if (prevDfOpt.isEmpty) {
+ mergedDfList = mergedDfList :+ inputDF1
+ } else {
+ val prevDf = prevDfOpt.get
+ val prevDfOld = prevDf.join(inputDF1, prevDf("_row_key") ===
inputDF1("_row_key")
+ && prevDf("partition") === inputDF1("partition"), "leftanti")
+ prevDfOld.show(500, false)
+ val unionDf = prevDfOld.union(inputDF1)
+ unionDf.show(500, false)
+ mergedDfList = mergedDfList :+ unionDf
+ }
+ }
+
+ private def getInstantTime(): String = {
+ String.format("%03d", new Integer(instantTime.getAndIncrement()))
+ }
+
+ private def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig
= {
+ val props = new Properties()
+ props.putAll(hudiOpts.asJava)
+ HoodieWriteConfig.newBuilder()
+ .withProps(props)
+ .withPath(basePath)
+ .build()
+ }
+
+ def getFileGroupCountForRecordIndex(writeConfig: HoodieWriteConfig): Long = {
+ val tableMetadata = getHoodieTable(metaClient,
writeConfig).getMetadataTable
+
tableMetadata.asInstanceOf[HoodieBackedTableMetadata].getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX)
+ }
+
+ private def validateDataAndRecordIndices(hudiOpts: Map[String, String]):
Unit = {
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val writeConfig = getWriteConfig(hudiOpts)
+ val metadata = metadataWriter(writeConfig).getTableMetadata
+ val readDf = spark.read.format("hudi").load(basePath)
+ val rowArr = readDf.collect()
+ val recordIndexMap = metadata.readRecordIndex(
+ rowArr.map(row =>
row.getAs("_hoodie_record_key").toString).toList.asJava)
+
+ assertTrue(rowArr.length > 0)
+ for (row <- rowArr) {
+ val recordKey: String = row.getAs("_hoodie_record_key")
+ val partitionPath: String = row.getAs("_hoodie_partition_path")
+ val fileName: String = row.getAs("_hoodie_file_name")
+ val recordLocation = recordIndexMap.get(recordKey)
+ assertEquals(partitionPath, recordLocation.getPartitionPath)
+ if (!writeConfig.inlineClusteringEnabled &&
!writeConfig.isAsyncClusteringEnabled) {
+ // The file id changes after clustering, so only assert it for usual
upsert and compaction operations
+ assertTrue(fileName.contains(recordLocation.getFileId), fileName + "
does not contain " + recordLocation.getFileId)
+ }
+ }
+
+ assertEquals(rowArr.length, recordIndexMap.keySet.size)
+ val estimatedFileGroupCount =
HoodieTableMetadataUtil.estimateFileGroupCount(
+ MetadataPartitionType.RECORD_INDEX, rowArr.length, 48,
+ writeConfig.getRecordIndexMinFileGroupCount,
writeConfig.getRecordIndexMaxFileGroupCount,
+ writeConfig.getRecordIndexGrowthFactor,
writeConfig.getRecordIndexMaxFileGroupSizeBytes)
+ assertEquals(estimatedFileGroupCount,
getFileGroupCountForRecordIndex(writeConfig))
+ val prevDf = mergedDfList.last.drop("tip_history")
+ val nonMatchingRecords = readDf.drop("_hoodie_commit_time",
"_hoodie_commit_seqno", "_hoodie_record_key",
+ "_hoodie_partition_path", "_hoodie_file_name", "tip_history")
+ .join(prevDf, prevDf.columns, "leftanti")
+ nonMatchingRecords.show(500, false)
+ assertEquals(0, nonMatchingRecords.count())
+ readDf.show(500, false)
+ prevDf.show(500, false)
+ assertEquals(readDf.count(), prevDf.count())
+ }
+}