This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6d3311c34c8 [HUDI-6443] Support delete_partition,
insert_overwrite/table with record-level index (#9055)
6d3311c34c8 is described below
commit 6d3311c34c8548339aee8c3708fe50855a04afb8
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jul 10 01:24:44 2023 +0800
[HUDI-6443] Support delete_partition, insert_overwrite/table with
record-level index (#9055)
- Support delete_partition, insert_overwrite and insert_overwrite_table
with record-level index. The metadata records should be updated accordingly.
all records in the deleted partition(s) should be deleted from RLI (for
delete_partition operation)
newly inserted records should be present in RLI
old records in the affected partitions should be removed from RLI
old records that happen to have the same record key as the new inserts
won't be removed from RLI; their entries will be updated
---------
Co-authored-by: sivabalan <[email protected]>
---
.../metadata/HoodieBackedTableMetadataWriter.java | 65 ++++++++++++++++++----
.../hudi/functional/TestRecordLevelIndex.scala | 28 +++++++++-
2 files changed, 80 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 97d1ce5e8b2..df73145a1bb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -42,7 +42,9 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
@@ -477,7 +479,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
+ partitions.size() + " partitions");
// Collect record keys from the files in parallel
- HoodieData<HoodieRecord> records =
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+ HoodieData<HoodieRecord> records =
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs, false);
records.persist("MEMORY_AND_DISK_SER");
final long recordCount = records.count();
@@ -495,7 +497,8 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
* Read the record keys from base files in partitions and return records.
*/
private HoodieData<HoodieRecord>
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
-
List<Pair<String, String>> partitionBaseFilePairs) {
+
List<Pair<String, String>> partitionBaseFilePairs,
+ boolean
forDelete) {
if (partitionBaseFilePairs.isEmpty()) {
return engineContext.emptyHoodieData();
}
@@ -524,8 +527,9 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
@Override
public HoodieRecord next() {
- return
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(),
partition, fileId,
- instantTime);
+ return forDelete
+ ?
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
+ :
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(),
partition, fileId, instantTime);
}
};
});
@@ -872,9 +876,10 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
// Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
- if (writeStatus != null && !writeStatus.isEmpty()) {
- partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX,
getRecordIndexUpdates(writeStatus));
- }
+ HoodieData<HoodieRecord> updatesFromWriteStatuses =
getRecordIndexUpdates(writeStatus);
+ HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpdates(updatesFromWriteStatuses, commitMetadata);
+ partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX,
updatesFromWriteStatuses.union(additionalUpdates));
+
return partitionToRecordMap;
});
closeInternal();
@@ -884,7 +889,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
* Update from {@code HoodieCleanMetadata}.
*
* @param cleanMetadata {@code HoodieCleanMetadata}
- * @param instantTime Timestamp at which the clean was completed
+ * @param instantTime Timestamp at which the clean was completed
*/
@Override
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
@@ -897,7 +902,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
* Update from {@code HoodieRestoreMetadata}.
*
* @param restoreMetadata {@code HoodieRestoreMetadata}
- * @param instantTime Timestamp at which the restore was performed
+ * @param instantTime Timestamp at which the restore was performed
*/
@Override
public void update(HoodieRestoreMetadata restoreMetadata, String
instantTime) {
@@ -911,7 +916,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
* Update from {@code HoodieRollbackMetadata}.
*
* @param rollbackMetadata {@code HoodieRollbackMetadata}
- * @param instantTime Timestamp at which the rollback was performed
+ * @param instantTime Timestamp at which the rollback was performed
*/
@Override
public void update(HoodieRollbackMetadata rollbackMetadata, String
instantTime) {
@@ -1225,6 +1230,46 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
.filter(Objects::nonNull);
}
+ private HoodieData<HoodieRecord>
getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata
replaceCommitMetadata) {
+ final HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
+ dataMetaClient.getActiveTimeline(), metadata);
+ List<Pair<String, String>> partitionBaseFilePairs = replaceCommitMetadata
+ .getPartitionToReplaceFileIds()
+ .keySet().stream().flatMap(partition
+ -> fsView.getLatestBaseFiles(partition).map(f ->
Pair.of(partition, f.getFileName())))
+ .collect(Collectors.toList());
+
+ return readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs,
true);
+ }
+
+ private HoodieData<HoodieRecord>
getRecordIndexAdditionalUpdates(HoodieData<HoodieRecord>
updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) {
+ WriteOperationType operationType = commitMetadata.getOperationType();
+ if (operationType == WriteOperationType.INSERT_OVERWRITE) {
+ // load existing records from replaced filegroups and left anti join
overwriting records
+ // return partition-level unmatched records (with newLocation being
null) to be deleted from RLI
+ return getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)
commitMetadata)
+ .mapToPair(r -> Pair.of(r.getKey(), r))
+ .leftOuterJoin(updatesFromWriteStatuses.mapToPair(r ->
Pair.of(r.getKey(), r)))
+ .values()
+ .filter(p -> !p.getRight().isPresent())
+ .map(Pair::getLeft);
+ } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
+ // load existing records from replaced filegroups and left anti join
overwriting records
+ // return globally unmatched records (with newLocation being null) to be
deleted from RLI
+ return getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)
commitMetadata)
+ .mapToPair(r -> Pair.of(r.getRecordKey(), r))
+ .leftOuterJoin(updatesFromWriteStatuses.mapToPair(r ->
Pair.of(r.getRecordKey(), r)))
+ .values()
+ .filter(p -> !p.getRight().isPresent())
+ .map(Pair::getLeft);
+ } else if (operationType == WriteOperationType.DELETE_PARTITION) {
+ // all records from the target partition(s) to be deleted from RLI
+ return getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)
commitMetadata);
+ } else {
+ return engineContext.emptyHoodieData();
+ }
+ }
+
protected void closeInternal() {
try {
close();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index e9100a518bf..614a412c4a5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -25,8 +25,8 @@ import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.MetadataConversionUtils
import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{ActionType, HoodieBaseFile,
HoodieCommitMetadata, HoodieTableType, WriteOperationType}
-import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig,
HoodieCompactionConfig, HoodieWriteConfig}
@@ -108,7 +108,6 @@ class TestRecordLevelIndex extends
HoodieSparkClientTestBase {
saveMode = SaveMode.Append)
}
- @Disabled("needs delete support")
@ParameterizedTest
@CsvSource(Array("COPY_ON_WRITE,true", "COPY_ON_WRITE,false",
"MERGE_ON_READ,true", "MERGE_ON_READ,false"))
def testRLIBulkInsertThenInsertOverwrite(tableType: HoodieTableType,
enableRowWriter: Boolean): Unit = {
@@ -200,6 +199,26 @@ class TestRecordLevelIndex extends
HoodieSparkClientTestBase {
validateDataAndRecordIndices(hudiOpts)
}
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testRLIWithDeletePartition(tableType: HoodieTableType): Unit = {
+ val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key ->
tableType.name())
+ val latestSnapshot = doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+
+ Using(getHoodieWriteClient(getWriteConfig(hudiOpts))) { client =>
+ val commitTime = client.startCommit
+ client.startCommitWithTime(commitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION)
+ val deletingPartition = dataGen.getPartitionPaths.last
+ val partitionList = Collections.singletonList(deletingPartition)
+ client.deletePartitions(partitionList, commitTime)
+
+ val deletedDf = latestSnapshot.filter(s"partition = $deletingPartition")
+ validateDataAndRecordIndices(hudiOpts, deletedDf)
+ }
+ }
+
@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
def testRLIUpsertAndDropIndex(tableType: HoodieTableType): Unit = {
@@ -503,15 +522,18 @@ class TestRecordLevelIndex extends
HoodieSparkClientTestBase {
latestBatch = recordsToStrings(dataGen.generateInserts(getInstantTime(),
5)).asScala
}
val latestBatchDf =
spark.read.json(spark.sparkContext.parallelize(latestBatch, 2))
+ latestBatchDf.cache()
latestBatchDf.write.format("org.apache.hudi")
.options(hudiOpts)
.option(DataSourceWriteOptions.OPERATION.key, operation)
.mode(saveMode)
.save(basePath)
val deletedDf = calculateMergedDf(latestBatchDf, operation)
+ deletedDf.cache()
if (validate) {
validateDataAndRecordIndices(hudiOpts, deletedDf)
}
+ deletedDf.unpersist()
latestBatchDf
}