This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f8cafbbba1e [HUDI-6412] Handle record index payload update (#9036)
f8cafbbba1e is described below

commit f8cafbbba1ea8543b90eec559f968ced53b463ed
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Jun 23 12:49:11 2023 +0530

    [HUDI-6412] Handle record index payload update (#9036)
---
 .../functional/TestHoodieBackedMetadata.java       |   4 +
 .../hudi/metadata/HoodieMetadataPayload.java       |  11 +-
 .../hudi/functional/TestMetadataRecordIndex.scala  | 212 +++++++++++++++++++++
 3 files changed, 217 insertions(+), 10 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 647ae107f4c..b50c002468f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1441,6 +1441,10 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
             .withEnableRecordIndex(true)
             .withRecordIndexFileGroupCount(5, 5)
             .build())
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withInlineClustering(true)
+            .withInlineClusteringNumCommits(2)
+            .build())
         .build();
 
     init(COPY_ON_WRITE, writeConfig);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index eb8794a64c7..888833b45a0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -419,16 +419,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
       case METADATA_TYPE_COLUMN_STATS:
         return new HoodieMetadataPayload(key, 
combineColumnStatsMetadata(previousRecord));
       case METADATA_TYPE_RECORD_INDEX:
-        // TODO: does not work with updates
-        if (previousRecord.recordIndexMetadata.getInstantTime() != 
recordIndexMetadata.getInstantTime()) {
-          throw new HoodieMetadataException(String.format("InstantTime for %s 
should not change from %s to %s", previousRecord.key,
-              previousRecord, this));
-        }
-        // TODO: This does not work with clustering
-        if 
(!previousRecord.getRecordGlobalLocation().equals(getRecordGlobalLocation())) {
-          throw new HoodieMetadataException(String.format("Location for %s 
should not change from %s to %s", previousRecord.key,
-              previousRecord, this));
-        }
+        // No need to merge with previous record index, always pick the latest 
payload.
         return this;
       default:
         throw new HoodieMetadataException("Unknown type of 
HoodieMetadataPayload: " + type);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
new file mode 100644
index 00000000000..0f716e18951
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.spark.sql._
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.Properties
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+@Tag("functional")
+class TestMetadataRecordIndex extends HoodieSparkClientTestBase {
+  var spark: SparkSession = _
+  var instantTime: AtomicInteger = _
+  val metadataOpts = Map(
+    HoodieMetadataConfig.ENABLE.key -> "true",
+    HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true"
+  )
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+    RECORDKEY_FIELD.key -> "_row_key",
+    PARTITIONPATH_FIELD.key -> "partition",
+    PRECOMBINE_FIELD.key -> "timestamp",
+    HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+  ) ++ metadataOpts
+  var mergedDfList: List[DataFrame] = List.empty
+
+  @BeforeEach
+  override def setUp() {
+    initPath()
+    initSparkContexts()
+    initFileSystem()
+    initTestDataGenerator()
+
+    setTableName("hoodie_test")
+    initMetaClient()
+
+    instantTime = new AtomicInteger(1)
+
+    spark = sqlContext.sparkSession
+  }
+
+  @AfterEach
+  override def tearDown() = {
+    cleanupFileSystem()
+    cleanupSparkContexts()
+  }
+
+  @Test
+  def testClusteringWithRecordIndex(): Unit = {
+    val hudiOpts = commonOpts ++ Map(
+      TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.name(),
+      HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
+      HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2"
+    )
+
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+
+    val lastClusteringInstant = getLatestClusteringInstant()
+    assertTrue(lastClusteringInstant.isPresent)
+
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+
+    
assertTrue(getLatestClusteringInstant().get().getTimestamp.compareTo(lastClusteringInstant.get().getTimestamp)
 > 0)
+    validateDataAndRecordIndices(hudiOpts)
+  }
+
+  private def getLatestClusteringInstant(): 
org.apache.hudi.common.util.Option[HoodieInstant] = {
+    metaClient.getActiveTimeline.getCompletedReplaceTimeline.lastInstant()
+  }
+
+  private def doWriteAndValidateDataAndRecordIndex(hudiOpts: Map[String, 
String],
+                                                   operation: String,
+                                                   saveMode: SaveMode,
+                                                   validate: Boolean = true): 
DataFrame = {
+    var records1: mutable.Buffer[String] = null
+    if (operation == UPSERT_OPERATION_OPT_VAL) {
+      val instantTime = getInstantTime()
+      val records = 
recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 20))
+      records.addAll(recordsToStrings(dataGen.generateInserts(instantTime, 
20)))
+      records1 = records.asScala
+    } else {
+      records1 = recordsToStrings(dataGen.generateInserts(getInstantTime(), 
100)).asScala
+    }
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(hudiOpts)
+      .option(OPERATION.key, operation)
+      .mode(saveMode)
+      .save(basePath)
+    calculateMergedDf(inputDF1)
+    if (validate) {
+      validateDataAndRecordIndices(hudiOpts)
+    }
+    inputDF1
+  }
+
+  def calculateMergedDf(inputDF1: DataFrame): Unit = {
+    val prevDfOpt = mergedDfList.lastOption
+    if (prevDfOpt.isEmpty) {
+      mergedDfList = mergedDfList :+ inputDF1
+    } else {
+      val prevDf = prevDfOpt.get
+      val prevDfOld = prevDf.join(inputDF1, prevDf("_row_key") === 
inputDF1("_row_key")
+        && prevDf("partition") === inputDF1("partition"), "leftanti")
+      prevDfOld.show(500, false)
+      val unionDf = prevDfOld.union(inputDF1)
+      unionDf.show(500, false)
+      mergedDfList = mergedDfList :+ unionDf
+    }
+  }
+
+  private def getInstantTime(): String = {
+    String.format("%03d", new Integer(instantTime.getAndIncrement()))
+  }
+
+  private def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig 
= {
+    val props = new Properties()
+    props.putAll(hudiOpts.asJava)
+    HoodieWriteConfig.newBuilder()
+      .withProps(props)
+      .withPath(basePath)
+      .build()
+  }
+
+  def getFileGroupCountForRecordIndex(writeConfig: HoodieWriteConfig): Long = {
+    val tableMetadata = getHoodieTable(metaClient, 
writeConfig).getMetadataTable
+    
tableMetadata.asInstanceOf[HoodieBackedTableMetadata].getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX)
+  }
+
+  private def validateDataAndRecordIndices(hudiOpts: Map[String, String]): 
Unit = {
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val writeConfig = getWriteConfig(hudiOpts)
+    val metadata = metadataWriter(writeConfig).getTableMetadata
+    val readDf = spark.read.format("hudi").load(basePath)
+    val rowArr = readDf.collect()
+    val recordIndexMap = metadata.readRecordIndex(
+      rowArr.map(row => 
row.getAs("_hoodie_record_key").toString).toList.asJava)
+
+    assertTrue(rowArr.length > 0)
+    for (row <- rowArr) {
+      val recordKey: String = row.getAs("_hoodie_record_key")
+      val partitionPath: String = row.getAs("_hoodie_partition_path")
+      val fileName: String = row.getAs("_hoodie_file_name")
+      val recordLocation = recordIndexMap.get(recordKey)
+      assertEquals(partitionPath, recordLocation.getPartitionPath)
+      if (!writeConfig.inlineClusteringEnabled && 
!writeConfig.isAsyncClusteringEnabled) {
+        // The file id changes after clustering, so only assert it for usual 
upsert and compaction operations
+        assertTrue(fileName.contains(recordLocation.getFileId), fileName + " 
does not contain " + recordLocation.getFileId)
+      }
+    }
+
+    assertEquals(rowArr.length, recordIndexMap.keySet.size)
+    val estimatedFileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(
+      MetadataPartitionType.RECORD_INDEX, rowArr.length, 48,
+      writeConfig.getRecordIndexMinFileGroupCount, 
writeConfig.getRecordIndexMaxFileGroupCount,
+      writeConfig.getRecordIndexGrowthFactor, 
writeConfig.getRecordIndexMaxFileGroupSizeBytes)
+    assertEquals(estimatedFileGroupCount, 
getFileGroupCountForRecordIndex(writeConfig))
+    val prevDf = mergedDfList.last.drop("tip_history")
+    val nonMatchingRecords = readDf.drop("_hoodie_commit_time", 
"_hoodie_commit_seqno", "_hoodie_record_key",
+      "_hoodie_partition_path", "_hoodie_file_name", "tip_history")
+      .join(prevDf, prevDf.columns, "leftanti")
+    nonMatchingRecords.show(500, false)
+    assertEquals(0, nonMatchingRecords.count())
+    readDf.show(500, false)
+    prevDf.show(500, false)
+    assertEquals(readDf.count(), prevDf.count())
+  }
+}

Reply via email to