This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bcb974b4813 [HUDI-7034] Fix refresh table/view (#10151)
bcb974b4813 is described below
commit bcb974b48133273ec0ef3e33b91889c61609e8b2
Author: VitoMakarevich <[email protected]>
AuthorDate: Thu Nov 23 11:22:14 2023 +0100
[HUDI-7034] Fix refresh table/view (#10151)
* [HUDI-7034] Refresh index fix - remove cached file slices within
partitions
---------
Co-authored-by: vmakarevich <[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
---
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 2 +
.../org/apache/hudi/TestHoodieFileIndex.scala | 63 +++++++++++++++++++++-
2 files changed, 64 insertions(+), 1 deletion(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index e697f385e04..824a94abab4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -428,6 +428,8 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
// Reset it to null to trigger re-loading of all partition path
this.cachedAllPartitionPaths = null;
+ // Reset to force reload file slices inside partitions
+ this.cachedAllInputFileSlices = new HashMap<>();
if (!shouldListLazily) {
ensurePreloadedPartitions(getAllQueryPartitionPaths());
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index a88d263e9dc..803702addb4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -29,7 +29,7 @@ import
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPU
import org.apache.hudi.common.config.{HoodieMetadataConfig,
HoodieStorageConfig}
import org.apache.hudi.common.engine.EngineType
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.model.{HoodieBaseFile, HoodieRecord,
HoodieTableType}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime
@@ -240,6 +240,67 @@ class TestHoodieFileIndex extends
HoodieSparkClientTestBase with ScalaAssertionS
assertEquals(List("2021/03/08", "2021/03/09"), prunedPartitions)
}
+ @ParameterizedTest
+ @CsvSource(value = Array("lazy,true", "lazy,false",
+ "eager,true", "eager,false"))
+ def testIndexRefreshesFileSlices(listingModeOverride: String,
+ useMetadataTable: Boolean): Unit = {
+ def getDistinctCommitTimeFromAllFilesInIndex(files:
Seq[PartitionDirectory]): Seq[String] = {
+ files.flatMap(_.files).map(fileStatus => new
HoodieBaseFile(fileStatus.getPath.toString)).map(_.getCommitTime).distinct
+ }
+
+ val r = new Random(0xDEED)
+ // partition column values are [0, 5)
+ val tuples = for (i <- 1 to 1000) yield (r.nextString(1000), r.nextInt(5),
r.nextString(1000))
+
+ val writeOpts = commonOpts ++ Map(HoodieMetadataConfig.ENABLE.key ->
useMetadataTable.toString)
+ val _spark = spark
+ import _spark.implicits._
+ val inputDF = tuples.toDF("_row_key", "partition", "timestamp")
+ inputDF
+ .write
+ .format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val readOpts = queryOpts ++ Map(
+ HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString,
+ DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key ->
listingModeOverride
+ )
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val fileIndexFirstWrite = HoodieFileIndex(spark, metaClient, None,
readOpts)
+
+ val listFilesAfterFirstWrite = fileIndexFirstWrite.listFiles(Nil, Nil)
+ val distinctListOfCommitTimesAfterFirstWrite =
getDistinctCommitTimeFromAllFilesInIndex(listFilesAfterFirstWrite)
+ val firstWriteCommitTime =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ assertEquals(1, distinctListOfCommitTimesAfterFirstWrite.size, "Should
have only one commit")
+ assertEquals(firstWriteCommitTime,
distinctListOfCommitTimesAfterFirstWrite.head, "All files should belong to the
first existing commit")
+
+ val nextBatch = for (
+ i <- 0 to 4
+ ) yield(r.nextString(1000), i, r.nextString(1000))
+
+ nextBatch.toDF("_row_key", "partition", "timestamp")
+ .write
+ .format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ fileIndexFirstWrite.refresh()
+ val fileSlicesAfterSecondWrite = fileIndexFirstWrite.listFiles(Nil, Nil)
+ val distinctListOfCommitTimesAfterSecondWrite =
getDistinctCommitTimeFromAllFilesInIndex(fileSlicesAfterSecondWrite)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val lastCommitTime =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ assertEquals(1, distinctListOfCommitTimesAfterSecondWrite.size, "All
basefiles affected so all have same commit time")
+ assertEquals(lastCommitTime,
distinctListOfCommitTimesAfterSecondWrite.head, "All files should be of second
commit after index refresh")
+ }
+
@ParameterizedTest
@CsvSource(value = Array("lazy,true,true", "lazy,true,false",
"lazy,false,true", "lazy,false,false",
"eager,true,true", "eager,true,false", "eager,false,true",
"eager,false,false"))