This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bcb974b4813 [HUDI-7034] Fix refresh table/view (#10151)
bcb974b4813 is described below

commit bcb974b48133273ec0ef3e33b91889c61609e8b2
Author: VitoMakarevich <[email protected]>
AuthorDate: Thu Nov 23 11:22:14 2023 +0100

    [HUDI-7034] Fix refresh table/view (#10151)
    
    * [HUDI-7034] Refresh index fix - remove cached file slices within 
partitions
    
    ---------
    
    Co-authored-by: vmakarevich <[email protected]>
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |  2 +
 .../org/apache/hudi/TestHoodieFileIndex.scala      | 63 +++++++++++++++++++++-
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java 
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index e697f385e04..824a94abab4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -428,6 +428,8 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
 
     // Reset it to null to trigger re-loading of all partition path
     this.cachedAllPartitionPaths = null;
+    // Reset to force reload file slices inside partitions
+    this.cachedAllInputFileSlices = new HashMap<>();
     if (!shouldListLazily) {
       ensurePreloadedPartitions(getAllQueryPartitionPaths());
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index a88d263e9dc..803702addb4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -29,7 +29,7 @@ import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPU
 import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieStorageConfig}
 import org.apache.hudi.common.engine.EngineType
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.model.{HoodieBaseFile, HoodieRecord, 
HoodieTableType}
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime
@@ -240,6 +240,67 @@ class TestHoodieFileIndex extends 
HoodieSparkClientTestBase with ScalaAssertionS
     assertEquals(List("2021/03/08", "2021/03/09"), prunedPartitions)
   }
 
+  @ParameterizedTest
+  @CsvSource(value = Array("lazy,true", "lazy,false",
+    "eager,true", "eager,false"))
+  def testIndexRefreshesFileSlices(listingModeOverride: String,
+                                   useMetadataTable: Boolean): Unit = {
+    def getDistinctCommitTimeFromAllFilesInIndex(files: 
Seq[PartitionDirectory]): Seq[String] = {
+      files.flatMap(_.files).map(fileStatus => new 
HoodieBaseFile(fileStatus.getPath.toString)).map(_.getCommitTime).distinct
+    }
+
+    val r = new Random(0xDEED)
+    // partition column values are [0, 5)
+    val tuples = for (i <- 1 to 1000) yield (r.nextString(1000), r.nextInt(5), 
r.nextString(1000))
+
+    val writeOpts = commonOpts ++ Map(HoodieMetadataConfig.ENABLE.key -> 
useMetadataTable.toString)
+    val _spark = spark
+    import _spark.implicits._
+    val inputDF = tuples.toDF("_row_key", "partition", "timestamp")
+    inputDF
+      .write
+      .format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val readOpts = queryOpts ++ Map(
+      HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString,
+      DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key -> 
listingModeOverride
+    )
+
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val fileIndexFirstWrite = HoodieFileIndex(spark, metaClient, None, 
readOpts)
+
+    val listFilesAfterFirstWrite = fileIndexFirstWrite.listFiles(Nil, Nil)
+    val distinctListOfCommitTimesAfterFirstWrite = 
getDistinctCommitTimeFromAllFilesInIndex(listFilesAfterFirstWrite)
+    val firstWriteCommitTime = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+    assertEquals(1, distinctListOfCommitTimesAfterFirstWrite.size, "Should 
have only one commit")
+    assertEquals(firstWriteCommitTime, 
distinctListOfCommitTimesAfterFirstWrite.head, "All files should belong to the 
first existing commit")
+
+    val nextBatch = for (
+      i <- 0 to 4
+    ) yield(r.nextString(1000), i, r.nextString(1000))
+
+    nextBatch.toDF("_row_key", "partition", "timestamp")
+      .write
+      .format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    fileIndexFirstWrite.refresh()
+    val fileSlicesAfterSecondWrite = fileIndexFirstWrite.listFiles(Nil, Nil)
+    val distinctListOfCommitTimesAfterSecondWrite = 
getDistinctCommitTimeFromAllFilesInIndex(fileSlicesAfterSecondWrite)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val lastCommitTime = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+    assertEquals(1, distinctListOfCommitTimesAfterSecondWrite.size, "All 
basefiles affected so all have same commit time")
+    assertEquals(lastCommitTime, 
distinctListOfCommitTimesAfterSecondWrite.head, "All files should be of second 
commit after index refresh")
+  }
+
   @ParameterizedTest
   @CsvSource(value = Array("lazy,true,true", "lazy,true,false", 
"lazy,false,true", "lazy,false,false",
     "eager,true,true", "eager,true,false", "eager,false,true", 
"eager,false,false"))

Reply via email to