lokeshj1703 commented on code in PR #12656:
URL: https://github.com/apache/hudi/pull/12656#discussion_r1923146230
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java:
##########
@@ -2537,63 +2542,90 @@ public void testCleaningArchivingAndCompaction() throws
Exception {
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+ totalExpectedDeltaCommitsinMdt += 1;
}
- HoodieTableMetaClient metadataMetaClient =
createMetaClient(metadataTableBasePath);
- HoodieTableMetaClient datasetMetaClient =
createMetaClient(config.getBasePath());
-
+ metadataMetaClient = createMetaClient(metadataTableBasePath);
+ datasetMetaClient = createMetaClient(config.getBasePath());
// There should not be any compaction yet and we have not performed more
than maxDeltaCommitsBeforeCompaction
- // deltacommits (1 will be due to bootstrap)
+ // deltacommits (1 will be due to bootstrap FILEs and 2nd one for cols
stats bootstrap)
HoodieActiveTimeline metadataTimeline =
metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
0);
-
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
maxDeltaCommitsBeforeCompaction - 1);
+ assertEquals(maxDeltaCommitsBeforeCompaction + 1,
metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(),
0);
// Next commit will initiate a compaction
newCommitTime = client.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+ totalExpectedDeltaCommitsinMdt += 1;
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
1);
-
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(),
maxDeltaCommitsBeforeCompaction + 1);
+ assertEquals(maxDeltaCommitsBeforeCompaction + 2,
metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(),
0);
+ }
+
+ // trigger async clustering, but do not cluster.
+ HoodieWriteConfig configForClustering = getWriteConfigBuilder(true, true,
false)
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
+
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER)
+
.retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(false)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(4, 5).build())
+
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
+ .build();
- // More than maxDeltaCommitsBeforeCompaction commits
- String inflightCommitTime = newCommitTime;
- for (int i = 0; i < maxDeltaCommitsBeforeCompaction + 1; ++i) {
+ Option<String> pendingClustering = Option.empty();
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
configForClustering)) {
+ pendingClustering = client.scheduleClustering(Option.empty());
+ }
+
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
config)) {
+ for (int i = 0; i < maxDeltaCommitsBeforeCompaction; ++i) {
newCommitTime = client.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
- client.startCommitWithTime(newCommitTime);
+ client.startCommitWithTime(newCommitTime); // when i = 2 completes,
1st commit gets archived
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
- if (i == 0) {
- // Mark this commit inflight so compactions don't take place
- FileCreateUtils.deleteCommit(basePath, newCommitTime);
- FileCreateUtils.createInflightCommit(basePath, newCommitTime);
- inflightCommitTime = newCommitTime;
+ totalExpectedDeltaCommitsinMdt += 1;
+ if (i == 2) {
Review Comment:
Would this only happen at i == 2 or also in subsequent iterations?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java:
##########
@@ -2509,13 +2509,14 @@ public void testReader() throws Exception {
* <p>
* Metadata Table should be automatically compacted as per config.
*/
- @Disabled
+ @Test
public void testCleaningArchivingAndCompaction() throws Exception {
Review Comment:
Should we reduce the complexity of the test or break it into separate tests?
Probably clustering related behaviour can be verified in a separate test.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java:
##########
@@ -2509,13 +2509,14 @@ public void testReader() throws Exception {
* <p>
* Metadata Table should be automatically compacted as per config.
*/
- @Disabled
+ @Test
public void testCleaningArchivingAndCompaction() throws Exception {
init(HoodieTableType.COPY_ON_WRITE, false);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
- final int maxDeltaCommitsBeforeCompaction = 3;
+ final int maxDeltaCommitsBeforeCompaction = 5;
Review Comment:
NIT: Rename to `maxDeltaCommitsBeforeCompactionInMDT`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]