This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 820006e025aff01b2e91450d7d8e36efd981cfae Author: Danny Chan <[email protected]> AuthorDate: Tue Feb 7 23:41:50 2023 +0800 [HUDI-5718] Unsupported Operation Exception for compaction (#7874) --- .../table/action/compact/TestHoodieCompactor.java | 118 +++++++++++++++------ .../table/log/HoodieMergedLogRecordScanner.java | 3 +- 2 files changed, 87 insertions(+), 34 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index c6cd554e289..c0e62631664 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -21,7 +21,9 @@ package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecord; @@ -37,7 +39,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieMemoryConfig; -import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.HoodieIndex; @@ -53,6 +54,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.List; import java.util.stream.Collectors; @@ -113,7 +115,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { } @Test - public void testCompactionEmpty() throws Exception { + public void testCompactionEmpty() { HoodieWriteConfig config = getConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient); @@ -169,41 +171,45 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { writeClient.insert(recordsRDD, newCommitTime).collect(); // Update all the 100 records - HoodieTable table = HoodieSparkTable.create(config, context); newCommitTime = "101"; + updateRecords(config, newCommitTime, records); - List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records); - JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); - HoodieIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); - JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table); + assertLogFilesNumEqualsTo(config, 1); + String compactionInstantTime = "102"; + HoodieData<WriteStatus> result = compact(writeClient, compactionInstantTime); + + verifyCompaction(result); + } + } + + @Test + public void testSpillingWhenCompaction() throws Exception { + // insert 100 records + HoodieWriteConfig config = getConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withMemoryConfig(HoodieMemoryConfig.newBuilder() + .withMaxMemoryMaxSize(1L, 1L).build()) // force spill + .build(); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { + String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); - writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect(); - metaClient.reloadActiveTimeline(); - - // Verify that all data file has one log file - table = HoodieSparkTable.create(config, context); - for (String partitionPath : dataGen.getPartitionPaths()) { - List<FileSlice> groupedLogFiles = - table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); - for (FileSlice fileSlice : groupedLogFiles) { - assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file"); - } - } - // Do a compaction - table = HoodieSparkTable.create(config, context); - String compactionInstantTime = "102"; - table.scheduleCompaction(context, compactionInstantTime, Option.empty()); - table.getMetaClient().reloadActiveTimeline(); - HoodieData<WriteStatus> result = (HoodieData<WriteStatus>) table.compact( - context, compactionInstantTime).getWriteStatuses(); - - // Verify that all partition paths are present in the WriteStatus result - for (String partitionPath : dataGen.getPartitionPaths()) { - List<WriteStatus> writeStatuses = result.collectAsList(); - assertTrue(writeStatuses.stream() - .filter(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)).count() > 0); + List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1); + writeClient.insert(recordsRDD, newCommitTime).collect(); + + // trigger 2 updates following with compaction + for (int i = 1; i < 5; i += 2) { + // Update all the 100 records + newCommitTime = "10" + i; + updateRecords(config, newCommitTime, records); + + assertLogFilesNumEqualsTo(config, 1); + + HoodieData<WriteStatus> result = compact(writeClient, "10" + (i + 1)); + + verifyCompaction(result); } } } @@ -212,4 +218,52 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { protected HoodieTableType getTableType() { return HoodieTableType.MERGE_ON_READ; } + + private void updateRecords(HoodieWriteConfig config, String newCommitTime, List<HoodieRecord> records) throws IOException { + HoodieTable table = HoodieSparkTable.create(config, context); + List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records); + JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); + HoodieIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); + JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table); + + writeClient.startCommitWithTime(newCommitTime); + writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect(); + metaClient.reloadActiveTimeline(); + } + + /** + * Verify that all data file has {@code expected} number of log files. + * + * @param config The writer config + * @param expected The expected number of log files + */ + private void assertLogFilesNumEqualsTo(HoodieWriteConfig config, int expected) { + HoodieTable table = HoodieSparkTable.create(config, context); + for (String partitionPath : dataGen.getPartitionPaths()) { + List<FileSlice> groupedLogFiles = + table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); + for (FileSlice fileSlice : groupedLogFiles) { + assertEquals(expected, fileSlice.getLogFiles().count(), "There should be " + expected + " log file written for every data file"); + } + } + } + + /** + * Do a compaction. + */ + private HoodieData<WriteStatus> compact(SparkRDDWriteClient writeClient, String compactionInstantTime) { + writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) writeClient.compact(compactionInstantTime).getWriteStatuses(); + return HoodieListData.eager(writeStatusJavaRDD.collect()); + } + + /** + * Verify that all partition paths are present in the WriteStatus result. + */ + private void verifyCompaction(HoodieData<WriteStatus> result) { + for (String partitionPath : dataGen.getPartitionPaths()) { + List<WriteStatus> writeStatuses = result.collectAsList(); + assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath))); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index e5ce343eb39..1a256956bfc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -47,7 +47,6 @@ import org.apache.log4j.Logger; import javax.annotation.concurrent.NotThreadSafe; import java.io.Closeable; import java.io.IOException; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -215,7 +214,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader } public Map<String, HoodieRecord> getRecords() { - return Collections.unmodifiableMap(records); + return records; } public HoodieRecordType getRecordType() {
