This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0e1f9653c0 [HUDI-5070] Move flaky cleaner tests to separate class
(#7251)
0e1f9653c0 is described below
commit 0e1f9653c0e73287527ae62f75ba6e679cf0c1da
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Nov 21 10:41:55 2022 +0800
[HUDI-5070] Move flaky cleaner tests to separate class (#7251)
---
.../java/org/apache/hudi/table/TestCleaner.java | 279 ++++-----------------
.../clean/TestCleanerInsertAndCleanByCommits.java | 41 ++-
.../clean/TestCleanerInsertAndCleanByVersions.java | 238 ++++++++++++++++++
3 files changed, 310 insertions(+), 248 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index c1dae9afa4..7577ba8c83 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -25,7 +25,6 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSliceInfo;
@@ -39,12 +38,9 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
-import org.apache.hudi.common.model.HoodieFileGroup;
-import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
@@ -61,13 +57,11 @@ import
org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigra
import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
-import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -100,7 +94,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@@ -110,7 +103,6 @@ import java.util.stream.Stream;
import scala.Tuple3;
import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
-import static
org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes;
import static
org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
@@ -211,14 +203,6 @@ public class TestCleaner extends HoodieClientTestBase {
return Pair.of(newCommitTime, statuses);
}
- /**
- * Test Clean-By-Versions using insert/upsert API.
- */
- @Test
- public void testInsertAndCleanByVersions() throws Exception {
- testInsertAndCleanByVersions(SparkRDDWriteClient::insert,
SparkRDDWriteClient::upsert, false);
- }
-
/**
* Test Clean-Failed-Writes when Cleaning policy is by VERSIONS using
insert/upsert API.
*/
@@ -228,32 +212,63 @@ public class TestCleaner extends HoodieClientTestBase {
}
/**
- * Test Clean-By-Versions using prepped versions of insert/upsert API.
+ * Test Helper for cleaning failed writes by versions logic from
HoodieWriteClient API perspective.
+ *
+ * @param insertFn Insert API to be tested
+ * @param isPreppedAPI Flag to indicate if a prepped-version is used. If
true, a wrapper function will be used during
+ * record generation to also tag the regards (de-dupe is
implicit as we use unique record-gen APIs)
+ * @throws Exception in case of errors
*/
- @Test
- public void testInsertPreppedAndCleanByVersions() throws Exception {
- testInsertAndCleanByVersions(SparkRDDWriteClient::insertPreppedRecords,
SparkRDDWriteClient::upsertPreppedRecords,
- true);
- }
+ private void testInsertAndCleanFailedWritesByVersions(
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI)
+ throws Exception {
+ int maxVersions = 3; // keep upto 3 versions for each file
+ HoodieWriteConfig cfg = getConfigBuilder()
+ .withAutoCommit(false)
+ .withHeartbeatIntervalInMs(3000)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
+ .withParallelism(1,
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
+
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
- /**
- * Test Clean-By-Versions using bulk-insert/upsert API.
- */
- @Test
- public void testBulkInsertAndCleanByVersions() throws Exception {
- testInsertAndCleanByVersions(SparkRDDWriteClient::bulkInsert,
SparkRDDWriteClient::upsert, false);
- }
+ final Function2<List<HoodieRecord>, String, Integer>
recordInsertGenWrappedFunction =
+ generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
- /**
- * Test Clean-By-Versions using prepped versions of bulk-insert/upsert API.
- */
- @Test
- public void testBulkInsertPreppedAndCleanByVersions() throws Exception {
- testInsertAndCleanByVersions(
- (client, recordRDD, instantTime) ->
client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
- SparkRDDWriteClient::upsertPreppedRecords, true);
- }
+ Pair<String, JavaRDD<WriteStatus>> result =
insertFirstBigBatchForClientCleanerTest(context, metaClient, client,
recordInsertGenWrappedFunction, insertFn);
+ client.commit(result.getLeft(), result.getRight());
+
+ HoodieTable table = HoodieSparkTable.create(client.getConfig(), context,
metaClient);
+
+ assertTrue(table.getCompletedCleanTimeline().empty());
+
+ insertFirstFailedBigBatchForClientCleanerTest(context, client,
recordInsertGenWrappedFunction, insertFn);
+
+ insertFirstFailedBigBatchForClientCleanerTest(context, client,
recordInsertGenWrappedFunction, insertFn);
+
+ Pair<String, JavaRDD<WriteStatus>> ret =
+ insertFirstFailedBigBatchForClientCleanerTest(context, client,
recordInsertGenWrappedFunction, insertFn);
+
+ // Await till enough time passes such that the last failed commits
heartbeats are expired
+ await().atMost(10, TimeUnit.SECONDS).until(() ->
client.getHeartbeatClient()
+ .isHeartbeatExpired(ret.getLeft()));
+
+ List<HoodieCleanStat> cleanStats = runCleaner(cfg);
+ assertEquals(0, cleanStats.size(), "Must not clean any files");
+ HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
+ assertTrue(timeline.getTimelineOfActions(
+
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants()
== 3);
+ Option<HoodieInstant> rollBackInstantForFailedCommit =
timeline.getTimelineOfActions(
+
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant();
+ HoodieRollbackMetadata rollbackMetadata =
TimelineMetadataUtils.deserializeAvroMetadata(
+
timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(),
HoodieRollbackMetadata.class);
+ // Rollback of one of the failed writes should have deleted 3 files
+ assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
+ }
+ }
/**
* Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple
config is set to false.
@@ -329,133 +344,6 @@ public class TestCleaner extends HoodieClientTestBase {
}
}
- /**
- * Test Helper for Cleaning by versions logic from HoodieWriteClient API
perspective.
- *
- * @param insertFn Insert API to be tested
- * @param upsertFn Upsert API to be tested
- * @param isPreppedAPI Flag to indicate if a prepped-version is used. If
true, a wrapper function will be used during
- * record generation to also tag the regards (de-dupe is implicit as
we use unique record-gen APIs)
- * @throws Exception in case of errors
- */
- private void testInsertAndCleanByVersions(
- Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> insertFn,
- Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
- throws Exception {
- int maxVersions = 2; // keep upto 2 versions for each file
- HoodieWriteConfig cfg = getConfigBuilder()
- .withCleanConfig(HoodieCleanConfig.newBuilder()
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
- .retainFileVersions(maxVersions).build())
- .withParallelism(1,
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
-
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
- .build();
- try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
-
- final Function2<List<HoodieRecord>, String, Integer>
recordInsertGenWrappedFunction =
- generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
-
- final Function2<List<HoodieRecord>, String, Integer>
recordUpsertGenWrappedFunction =
- generateWrapRecordsFn(isPreppedAPI, cfg,
dataGen::generateUniqueUpdates);
-
- insertFirstBigBatchForClientCleanerTest(context, metaClient, client,
recordInsertGenWrappedFunction, insertFn);
-
- Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice =
new HashMap<>();
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTable table = HoodieSparkTable.create(getConfig(), context,
metaClient);
- for (String partitionPath : dataGen.getPartitionPaths()) {
- TableFileSystemView fsView = table.getFileSystemView();
- Option<Boolean> added =
Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg
-> {
- fg.getLatestFileSlice().map(fs ->
compactionFileIdToLatestFileSlice.put(fg.getFileGroupId(), fs));
- return true;
- }));
- if (added.isPresent()) {
- // Select only one file-group for compaction
- break;
- }
- }
-
- // Create workload with selected file-slices
- List<Pair<String, FileSlice>> partitionFileSlicePairs =
compactionFileIdToLatestFileSlice.entrySet().stream()
- .map(e -> Pair.of(e.getKey().getPartitionPath(),
e.getValue())).collect(Collectors.toList());
- HoodieCompactionPlan compactionPlan =
- CompactionUtils.buildFromFileSlices(partitionFileSlicePairs,
Option.empty(), Option.empty());
- List<String> instantTimes = makeIncrementalCommitTimes(9, 1, 10);
- String compactionTime = instantTimes.get(0);
- table.getActiveTimeline().saveToCompactionRequested(
- new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION,
compactionTime),
- TimelineMetadataUtils.serializeCompactionPlan(compactionPlan));
-
- instantTimes = instantTimes.subList(1, instantTimes.size());
- // Keep doing some writes and clean inline. Make sure we have expected
number of files
- // remaining.
- for (String newInstantTime : instantTimes) {
- try {
- client.startCommitWithTime(newInstantTime);
- List<HoodieRecord> records =
recordUpsertGenWrappedFunction.apply(newInstantTime, 100);
-
- List<WriteStatus> statuses = upsertFn.apply(client,
jsc.parallelize(records, 1), newInstantTime).collect();
- // Verify there are no errors
- assertNoWriteErrors(statuses);
-
- metaClient = HoodieTableMetaClient.reload(metaClient);
- table = HoodieSparkTable.create(getConfig(), context, metaClient);
- HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
-
- TableFileSystemView fsView = table.getFileSystemView();
- // Need to ensure the following
- for (String partitionPath : dataGen.getPartitionPaths()) {
- // compute all the versions of all files, from time 0
- HashMap<String, TreeSet<String>> fileIdToVersions = new
HashMap<>();
- for (HoodieInstant entry :
timeline.getInstants().collect(Collectors.toList())) {
- HoodieCommitMetadata commitMetadata =
-
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(entry).get(),
HoodieCommitMetadata.class);
-
- for (HoodieWriteStat wstat :
commitMetadata.getWriteStats(partitionPath)) {
- if (!fileIdToVersions.containsKey(wstat.getFileId())) {
- fileIdToVersions.put(wstat.getFileId(), new TreeSet<>());
- }
-
fileIdToVersions.get(wstat.getFileId()).add(FSUtils.getCommitTime(new
Path(wstat.getPath()).getName()));
- }
- }
-
- List<HoodieFileGroup> fileGroups =
fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
-
- for (HoodieFileGroup fileGroup : fileGroups) {
- if
(compactionFileIdToLatestFileSlice.containsKey(fileGroup.getFileGroupId())) {
- // Ensure latest file-slice selected for compaction is retained
- Option<HoodieBaseFile> dataFileForCompactionPresent =
-
Option.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> {
- return
compactionFileIdToLatestFileSlice.get(fileGroup.getFileGroupId()).getBaseInstantTime()
- .equals(df.getCommitTime());
- }).findAny());
- assertTrue(dataFileForCompactionPresent.isPresent(),
- "Data File selected for compaction is retained");
- } else {
- // file has no more than max versions
- String fileId = fileGroup.getFileGroupId().getFileId();
- List<HoodieBaseFile> dataFiles =
fileGroup.getAllBaseFiles().collect(Collectors.toList());
-
- assertTrue(dataFiles.size() <= maxVersions,
- "fileId " + fileId + " has more than " + maxVersions + "
versions");
-
- // Each file, has the latest N versions (i.e cleaning gets rid
of older versions)
- List<String> commitedVersions = new
ArrayList<>(fileIdToVersions.get(fileId));
- for (int i = 0; i < dataFiles.size(); i++) {
- assertEquals((dataFiles.get(i)).getCommitTime(),
- commitedVersions.get(commitedVersions.size() - 1 - i),
- "File " + fileId + " does not have latest versions on
commits" + commitedVersions);
- }
- }
- }
- }
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
- }
- }
- }
-
/**
* Test Clean-By-Commits using insert/upsert API.
*/
@@ -676,7 +564,7 @@ public class TestCleaner extends HoodieClientTestBase {
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--instantClean,
"%09d")));
}
-
+
@Test
public void testCleanWithReplaceCommits() throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
@@ -1200,65 +1088,6 @@ public class TestCleaner extends HoodieClientTestBase {
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
}
- /**
- * Test Helper for cleaning failed writes by versions logic from
HoodieWriteClient API perspective.
- *
- * @param insertFn Insert API to be tested
- * @param isPreppedAPI Flag to indicate if a prepped-version is used. If
true, a wrapper function will be used during
- * record generation to also tag the regards (de-dupe is
implicit as we use unique record-gen APIs)
- * @throws Exception in case of errors
- */
- private void testInsertAndCleanFailedWritesByVersions(
- Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI)
- throws Exception {
- int maxVersions = 3; // keep upto 3 versions for each file
- HoodieWriteConfig cfg = getConfigBuilder()
- .withAutoCommit(false)
- .withHeartbeatIntervalInMs(3000)
- .withCleanConfig(HoodieCleanConfig.newBuilder()
-
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
-
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
- .withParallelism(1,
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
-
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
- .build();
- try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
-
- final Function2<List<HoodieRecord>, String, Integer>
recordInsertGenWrappedFunction =
- generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
-
- Pair<String, JavaRDD<WriteStatus>> result =
insertFirstBigBatchForClientCleanerTest(context, metaClient, client,
recordInsertGenWrappedFunction, insertFn);
-
- client.commit(result.getLeft(), result.getRight());
-
- HoodieTable table = HoodieSparkTable.create(client.getConfig(), context,
metaClient);
-
- assertTrue(table.getCompletedCleanTimeline().empty());
-
- insertFirstFailedBigBatchForClientCleanerTest(context, client,
recordInsertGenWrappedFunction, insertFn);
-
- insertFirstFailedBigBatchForClientCleanerTest(context, client,
recordInsertGenWrappedFunction, insertFn);
-
- Pair<String, JavaRDD<WriteStatus>> ret =
- insertFirstFailedBigBatchForClientCleanerTest(context, client,
recordInsertGenWrappedFunction, insertFn);
-
- // Await till enough time passes such that the last failed commits
heartbeats are expired
- await().atMost(10, TimeUnit.SECONDS).until(() ->
client.getHeartbeatClient()
- .isHeartbeatExpired(ret.getLeft()));
-
- List<HoodieCleanStat> cleanStats = runCleaner(cfg);
- assertEquals(0, cleanStats.size(), "Must not clean any files");
- HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
- assertTrue(timeline.getTimelineOfActions(
-
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants()
== 3);
- Option<HoodieInstant> rollBackInstantForFailedCommit =
timeline.getTimelineOfActions(
-
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant();
- HoodieRollbackMetadata rollbackMetadata =
TimelineMetadataUtils.deserializeAvroMetadata(
-
timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(),
HoodieRollbackMetadata.class);
- // Rollback of one of the failed writes should have deleted 3 files
- assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
- }
- }
-
/**
* Common test method for validating pending compactions.
*
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
index 7f5cd5cd99..816a937187 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
@@ -46,7 +46,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -105,10 +104,10 @@ public class TestCleanerInsertAndCleanByCommits extends
SparkClientFunctionalTes
/**
* Test Helper for Cleaning by versions logic from HoodieWriteClient API
perspective.
*
- * @param insertFn Insert API to be tested
- * @param upsertFn Upsert API to be tested
+ * @param insertFn Insert API to be tested
+ * @param upsertFn Upsert API to be tested
* @param isPreppedAPI Flag to indicate if a prepped-version is used. If
true, a wrapper function will be used during
- * record generation to also tag the regards (de-dupe is implicit as
we use unique record-gen APIs)
+ * record generation to also tag the regards (de-dupe is
implicit as we use unique record-gen APIs)
* @throws Exception in case of errors
*/
private void testInsertAndCleanByCommits(
@@ -127,23 +126,21 @@ public class TestCleanerInsertAndCleanByCommits extends
SparkClientFunctionalTes
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
.build();
- final SparkRDDWriteClient client = getHoodieWriteClient(cfg);
-
- final HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(System.nanoTime());
- final Function2<List<HoodieRecord>, String, Integer>
recordInsertGenWrappedFunction = isPreppedAPI
- ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(),
context(), cfg, dataGen::generateInserts)
- : dataGen::generateInserts;
- final Function2<List<HoodieRecord>, String, Integer>
recordUpsertGenWrappedFunction = isPreppedAPI
- ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(),
context(), cfg, dataGen::generateUniqueUpdates)
- : dataGen::generateUniqueUpdates;
-
- HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
- insertFirstBigBatchForClientCleanerTest(context(), metaClient, client,
recordInsertGenWrappedFunction, insertFn);
-
- // Keep doing some writes and clean inline. Make sure we have expected
number of files remaining.
- for (int i = 0; i < 8; i++) {
- String newCommitTime = makeNewCommitTime();
- try {
+ try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ final HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(System.nanoTime());
+ final Function2<List<HoodieRecord>, String, Integer>
recordInsertGenWrappedFunction = isPreppedAPI
+ ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(),
context(), cfg, dataGen::generateInserts)
+ : dataGen::generateInserts;
+ final Function2<List<HoodieRecord>, String, Integer>
recordUpsertGenWrappedFunction = isPreppedAPI
+ ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(),
context(), cfg, dataGen::generateUniqueUpdates)
+ : dataGen::generateUniqueUpdates;
+
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+ insertFirstBigBatchForClientCleanerTest(context(), metaClient, client,
recordInsertGenWrappedFunction, insertFn);
+
+ // Keep doing some writes and clean inline. Make sure we have expected
number of files remaining.
+ for (int i = 0; i < 8; i++) {
+ String newCommitTime = makeNewCommitTime();
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records =
recordUpsertGenWrappedFunction.apply(newCommitTime, BATCH_SIZE);
@@ -186,8 +183,6 @@ public class TestCleanerInsertAndCleanByCommits extends
SparkClientFunctionalTes
"Only contain acceptable versions of file should be present");
}
}
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
}
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
new file mode 100644
index 0000000000..e9c74936f3
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase.Function2;
+import org.apache.hudi.testutils.HoodieClientTestBase.Function3;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes;
+import static
org.apache.hudi.table.TestCleaner.insertFirstBigBatchForClientCleanerTest;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static
org.apache.hudi.testutils.HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestCleanerInsertAndCleanByVersions extends
SparkClientFunctionalTestHarness {
+
+ private static final int BATCH_SIZE = 100;
+ private static final int PARALLELISM = 2;
+
+ /**
+ * Test Clean-By-Versions using insert/upsert API.
+ */
+ @Test
+ public void testInsertAndCleanByVersions() throws Exception {
+ testInsertAndCleanByVersions(SparkRDDWriteClient::insert,
SparkRDDWriteClient::upsert, false);
+ }
+
+ /**
+ * Test Clean-By-Versions using prepped versions of insert/upsert API.
+ */
+ @Test
+ public void testInsertPreppedAndCleanByVersions() throws Exception {
+ testInsertAndCleanByVersions(SparkRDDWriteClient::insertPreppedRecords,
SparkRDDWriteClient::upsertPreppedRecords,
+ true);
+ }
+
+ /**
+ * Test Clean-By-Versions using bulk-insert/upsert API.
+ */
+ @Test
+ public void testBulkInsertAndCleanByVersions() throws Exception {
+ testInsertAndCleanByVersions(SparkRDDWriteClient::bulkInsert,
SparkRDDWriteClient::upsert, false);
+ }
+
+ /**
+ * Test Clean-By-Versions using prepped versions of bulk-insert/upsert API.
+ */
+ @Test
+ public void testBulkInsertPreppedAndCleanByVersions() throws Exception {
+ testInsertAndCleanByVersions(
+ (client, recordRDD, instantTime) ->
client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
+ SparkRDDWriteClient::upsertPreppedRecords, true);
+ }
+
+ /**
+ * Test Helper for Cleaning by versions logic from HoodieWriteClient API
perspective.
+ *
+ * @param insertFn Insert API to be tested
+ * @param upsertFn Upsert API to be tested
+ * @param isPreppedAPI Flag to indicate if a prepped-version is used. If
true, a wrapper function will be used during
+ * record generation to also tag the regards (de-dupe is
implicit as we use unique record-gen APIs)
+ * @throws Exception in case of errors
+ */
+ private void testInsertAndCleanByVersions(
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> insertFn,
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
+ throws Exception {
+ int maxVersions = 2; // keep upto 2 versions for each file
+ HoodieWriteConfig cfg = getConfigBuilder(true)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
+ .retainFileVersions(maxVersions).build())
+ .withParallelism(PARALLELISM, PARALLELISM)
+ .withBulkInsertParallelism(PARALLELISM)
+ .withFinalizeWriteParallelism(PARALLELISM)
+ .withDeleteParallelism(PARALLELISM)
+
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+ .build();
+ try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ final HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(System.nanoTime());
+ final Function2<List<HoodieRecord>, String, Integer>
recordInsertGenWrappedFunction = isPreppedAPI
+ ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(),
context(), cfg, dataGen::generateInserts)
+ : dataGen::generateInserts;
+ final Function2<List<HoodieRecord>, String, Integer>
recordUpsertGenWrappedFunction = isPreppedAPI
+ ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(),
context(), cfg, dataGen::generateUniqueUpdates)
+ : dataGen::generateUniqueUpdates;
+
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+ insertFirstBigBatchForClientCleanerTest(context(), metaClient, client,
recordInsertGenWrappedFunction, insertFn);
+
+ Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice =
new HashMap<>();
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient);
+ for (String partitionPath : dataGen.getPartitionPaths()) {
+ TableFileSystemView fsView = table.getFileSystemView();
+ Option<Boolean> added =
Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg
-> {
+ fg.getLatestFileSlice().map(fs ->
compactionFileIdToLatestFileSlice.put(fg.getFileGroupId(), fs));
+ return true;
+ }));
+ if (added.isPresent()) {
+ // Select only one file-group for compaction
+ break;
+ }
+ }
+
+ // Create workload with selected file-slices
+ List<Pair<String, FileSlice>> partitionFileSlicePairs =
compactionFileIdToLatestFileSlice.entrySet().stream()
+ .map(e -> Pair.of(e.getKey().getPartitionPath(),
e.getValue())).collect(Collectors.toList());
+ HoodieCompactionPlan compactionPlan =
+ CompactionUtils.buildFromFileSlices(partitionFileSlicePairs,
Option.empty(), Option.empty());
+ List<String> instantTimes = makeIncrementalCommitTimes(9, 1, 10);
+ String compactionTime = instantTimes.get(0);
+ table.getActiveTimeline().saveToCompactionRequested(
+ new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, compactionTime),
+ TimelineMetadataUtils.serializeCompactionPlan(compactionPlan));
+
+ instantTimes = instantTimes.subList(1, instantTimes.size());
+ // Keep doing some writes and clean inline. Make sure we have expected
number of files
+ // remaining.
+ for (String newInstantTime : instantTimes) {
+ client.startCommitWithTime(newInstantTime);
+ List<HoodieRecord> records =
recordUpsertGenWrappedFunction.apply(newInstantTime, BATCH_SIZE);
+
+ List<WriteStatus> statuses = upsertFn.apply(client,
jsc().parallelize(records, PARALLELISM), newInstantTime).collect();
+ // Verify there are no errors
+ assertNoWriteErrors(statuses);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieSparkTable.create(cfg, context(), metaClient);
+ HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
+
+ TableFileSystemView fsView = table.getFileSystemView();
+ // Need to ensure the following
+ for (String partitionPath : dataGen.getPartitionPaths()) {
+ // compute all the versions of all files, from time 0
+ HashMap<String, TreeSet<String>> fileIdToVersions = new HashMap<>();
+ for (HoodieInstant entry :
timeline.getInstants().collect(Collectors.toList())) {
+ HoodieCommitMetadata commitMetadata =
+
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(entry).get(),
HoodieCommitMetadata.class);
+
+ for (HoodieWriteStat wstat :
commitMetadata.getWriteStats(partitionPath)) {
+ if (!fileIdToVersions.containsKey(wstat.getFileId())) {
+ fileIdToVersions.put(wstat.getFileId(), new TreeSet<>());
+ }
+
fileIdToVersions.get(wstat.getFileId()).add(FSUtils.getCommitTime(new
Path(wstat.getPath()).getName()));
+ }
+ }
+
+ List<HoodieFileGroup> fileGroups =
fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+
+ for (HoodieFileGroup fileGroup : fileGroups) {
+ if
(compactionFileIdToLatestFileSlice.containsKey(fileGroup.getFileGroupId())) {
+ // Ensure latest file-slice selected for compaction is retained
+ Option<HoodieBaseFile> dataFileForCompactionPresent =
+
Option.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> {
+ return
compactionFileIdToLatestFileSlice.get(fileGroup.getFileGroupId()).getBaseInstantTime()
+ .equals(df.getCommitTime());
+ }).findAny());
+ assertTrue(dataFileForCompactionPresent.isPresent(),
+ "Data File selected for compaction is retained");
+ } else {
+ // file has no more than max versions
+ String fileId = fileGroup.getFileGroupId().getFileId();
+ List<HoodieBaseFile> dataFiles =
fileGroup.getAllBaseFiles().collect(Collectors.toList());
+
+ assertTrue(dataFiles.size() <= maxVersions,
+ "fileId " + fileId + " has more than " + maxVersions + "
versions");
+
+ // Each file, has the latest N versions (i.e cleaning gets rid
of older versions)
+ List<String> commitedVersions = new
ArrayList<>(fileIdToVersions.get(fileId));
+ for (int i = 0; i < dataFiles.size(); i++) {
+ assertEquals((dataFiles.get(i)).getCommitTime(),
+ commitedVersions.get(commitedVersions.size() - 1 - i),
+ "File " + fileId + " does not have latest versions on
commits" + commitedVersions);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}