This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch rfc-15 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit a2dcacf7b370096e5a65dcefdbe9dcc7cdd80078 Author: Prashant Wason <[email protected]> AuthorDate: Fri Oct 16 18:15:31 2020 -0700 [HUDI-1346] Choose a new instant time when performing autoClean. --- .../org/apache/hudi/client/AsyncCleanerService.java | 7 ++++--- .../org/apache/hudi/client/HoodieWriteClient.java | 19 ++++++++++--------- .../org/apache/hudi/metadata/TestHoodieMetadata.java | 5 ++++- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java index 6367e79..331948d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java @@ -19,6 +19,7 @@ package org.apache.hudi.client; import org.apache.hudi.async.AbstractAsyncService; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; @@ -52,11 +53,11 @@ class AsyncCleanerService extends AbstractAsyncService { }), executor); } - public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient, - String instantTime) { + public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient) { AsyncCleanerService asyncCleanerService = null; if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { - LOG.info("Auto cleaning is enabled. Running cleaner async to write operation"); + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); asyncCleanerService = new AsyncCleanerService(writeClient, instantTime); asyncCleanerService.start(null); } else { diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index a0019b0..614f0dc 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -193,7 +193,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); table.validateUpsertSchema(); setOperationType(WriteOperationType.UPSERT); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); HoodieWriteMetadata result = table.upsert(jsc, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); @@ -214,7 +214,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime); table.validateUpsertSchema(); setOperationType(WriteOperationType.UPSERT_PREPPED); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords); return postWrite(result, instantTime, table); } @@ -233,7 +233,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.INSERT); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); HoodieWriteMetadata result = table.insert(jsc,instantTime, records); return postWrite(result, instantTime, table); } @@ -253,7 +253,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.INSERT_PREPPED); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords); return postWrite(result, instantTime, table); } @@ -293,7 +293,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, userDefinedBulkInsertPartitioner); return postWrite(result, instantTime, table); } @@ -319,7 +319,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT_PREPPED); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner); return postWrite(result, instantTime, table); } @@ -335,7 +335,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.INSERT_OVERWRITE); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); HoodieWriteMetadata result = table.insertOverwrite(jsc, instantTime, records); return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); } @@ -425,8 +425,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo AsyncCleanerService.waitForCompletion(asyncCleanerService); LOG.info("Cleaner has finished"); } else { + // Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps. LOG.info("Auto cleaning is enabled. Running cleaner now"); - clean(instantTime); + clean(); } } } @@ -569,7 +570,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * cleaned) */ public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException { - LOG.info("Cleaner started"); + LOG.info("Cleaner started for instant time " + cleanInstantTime); final Timer.Context context = metrics.getCleanCtx(); HoodieCleanMetadata metadata = getTable().clean(jsc, cleanInstantTime); if (context != null && metadata != null) { diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java index 9395d5f..751e2ab 100644 --- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java +++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Random; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; @@ -457,6 +458,8 @@ public class TestHoodieMetadata extends HoodieClientTestHarness { init(HoodieTableType.COPY_ON_WRITE); final int maxDeltaCommitsBeforeCompaction = 6; + // Test autoClean and asyncClean based on this flag which is randomly chosen. + boolean asyncClean = new Random().nextBoolean(); HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) .withMetadataCompactionConfig(HoodieCompactionConfig.newBuilder() .archiveCommitsWith(2, 4).retainCommits(1).retainFileVersions(1).withAutoClean(true) @@ -464,7 +467,7 @@ public class TestHoodieMetadata extends HoodieClientTestHarness { .withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction) .build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3) - .retainCommits(1).retainFileVersions(1).withAutoClean(false).build()) + .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(asyncClean).build()) .build(); List<HoodieRecord> records; HoodieTableMetaClient metaClient = ClientUtils.createMetaClient(jsc.hadoopConfiguration(), config, true);
