This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cc95b03b5ef [HUDI-7002] Fixing initializing RLI MDT partition for
non-partitioned dataset (#9938)
cc95b03b5ef is described below
commit cc95b03b5ef69ee585e0f781b6ad29cf5e770469
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Nov 3 10:21:04 2023 -0400
[HUDI-7002] Fixing initializing RLI MDT partition for non-partitioned
dataset (#9938)
---
.../hudi/metadata/HoodieTableMetadataUtil.java | 2 +-
.../hudi/functional/TestRecordLevelIndex.scala | 36 ++++++++++++++++++++--
2 files changed, 34 insertions(+), 4 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 6502f8c246d..6808a0ef8dc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1738,7 +1738,7 @@ public class HoodieTableMetadataUtil {
final String partition = partitionAndBaseFile.getKey();
final HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
final String filename = baseFile.getFileName();
- Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR +
filename);
+ Path dataFilePath = new Path(basePath,
StringUtils.isNullOrEmpty(partition) ? filename : (partition + Path.SEPARATOR)
+ filename);
final String fileId = baseFile.getFileId();
final String instantTime = baseFile.getCommitTime();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 17e3cadeeff..56866e7bf40 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -26,13 +26,15 @@ import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.config._
import org.apache.hudi.exception.HoodieWriteConflictException
+import
org.apache.hudi.functional.TestCOWDataSourceStorage.{SQL_DRIVER_IS_NOT_NULL,
SQL_DRIVER_IS_NULL, SQL_QUERY_EQUALITY_VALIDATOR_CLASS_NAME,
SQL_QUERY_INEQUALITY_VALIDATOR_CLASS_NAME, SQL_RIDER_IS_NOT_NULL,
SQL_RIDER_IS_NULL}
import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
MetadataPartitionType}
import org.apache.hudi.util.JavaConversions
import org.apache.spark.sql._
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
+import org.junit.jupiter.params.provider.Arguments.arguments
+import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource,
MethodSource}
import java.util.Collections
import java.util.concurrent.Executors
@@ -65,6 +67,18 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase {
saveMode = SaveMode.Append)
}
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testRLIUpsertNonPartitioned(tableType: HoodieTableType): Unit = {
+ val hudiOpts = commonOpts - PARTITIONPATH_FIELD.key +
(DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
+ doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+ doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+ }
+
@ParameterizedTest
@CsvSource(Array("COPY_ON_WRITE,true", "COPY_ON_WRITE,false",
"MERGE_ON_READ,true", "MERGE_ON_READ,false"))
def testRLIBulkInsertThenInsertOverwrite(tableType: HoodieTableType,
enableRowWriter: Boolean): Unit = {
@@ -335,12 +349,16 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase {
}
@ParameterizedTest
- @EnumSource(classOf[HoodieTableType])
- def testEnableDisableRLI(tableType: HoodieTableType): Unit = {
+ @MethodSource(Array("testEnableDisableRLIParams"))
+ def testEnableDisableRLI(tableType: HoodieTableType, isPartitioned:
Boolean): Unit = {
var hudiOpts = commonOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()
)
+ if (!isPartitioned) {
+ hudiOpts = hudiOpts - PARTITIONPATH_FIELD.key
+ }
+
doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite)
@@ -470,3 +488,15 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase {
validateDataAndRecordIndices(hudiOpts)
}
}
+
+object TestRecordLevelIndex {
+
+ def testEnableDisableRLIParams(): java.util.stream.Stream[Arguments] = {
+ java.util.stream.Stream.of(
+ arguments(HoodieTableType.COPY_ON_WRITE, new java.lang.Boolean(false)),
+ arguments(HoodieTableType.COPY_ON_WRITE, new java.lang.Boolean(true)),
+ arguments(HoodieTableType.MERGE_ON_READ, new java.lang.Boolean(false)),
+ arguments(HoodieTableType.MERGE_ON_READ, new java.lang.Boolean(true))
+ )
+ }
+}