This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new a8feee9 Performing commit archiving in batches to avoid keeping a huge chunk in memory a8feee9 is described below commit a8feee929394194922405bd12b330e40e9b710fe Author: Nishith Agarwal <nagar...@uber.com> AuthorDate: Sun Apr 7 11:12:22 2019 -0700 Performing commit archiving in batches to avoid keeping a huge chunk in memory --- .../com/uber/hoodie/config/HoodieCompactionConfig.java | 9 +++++++++ .../java/com/uber/hoodie/config/HoodieWriteConfig.java | 5 +++++ .../com/uber/hoodie/io/HoodieCommitArchiveLog.java | 18 ++++++++++++++---- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 95e0c9b..dfd69c5 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -45,6 +45,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained"; public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits"; public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits"; + public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = "hoodie.commits.archival.batch"; // Upsert uses this file size to compact new data onto existing files.. public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit"; // By default, treat any file <= 100MB as a small file. @@ -104,6 +105,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10"; private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30"; private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20"; + private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = String.valueOf(10); public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = "hoodie.compaction.daybased.target" + ".partitions"; // 500GB of target IO per compaction (both read and write) @@ -240,6 +242,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } + public Builder withCommitsArchivalBatchSize(int batchSize) { + props.setProperty(COMMITS_ARCHIVAL_BATCH_SIZE_PROP, String.valueOf(batchSize)); + return this; + } + public HoodieCompactionConfig build() { HoodieCompactionConfig config = new HoodieCompactionConfig(props); setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, @@ -281,6 +288,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { COMPACTION_REVERSE_LOG_READ_ENABLED_PROP, DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED); setDefaultOnCondition(props, !props.containsKey(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP), TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION); + setDefaultOnCondition(props, !props.containsKey(COMMITS_ARCHIVAL_BATCH_SIZE_PROP), + COMMITS_ARCHIVAL_BATCH_SIZE_PROP, DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE); HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 7156623..115dd51 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -249,6 +249,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { .parseInt(props.getProperty(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP)); } + public int getCommitArchivalBatchSize() { + return Integer + .parseInt(props.getProperty(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE_PROP)); + } + /** * index properties **/ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index eb836d0..ccf303a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -245,11 +245,11 @@ public class HoodieCommitArchiveLog { List<IndexedRecord> records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { records.add(convertToAvroRecord(commitTimeline, hoodieInstant)); + if (records.size() >= this.config.getCommitArchivalBatchSize()) { + writeToFile(wrapperSchema, records); + } } - Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap(); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString()); - HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header); - this.writer = writer.appendBlock(block); + writeToFile(wrapperSchema, records); } catch (Exception e) { throw new HoodieCommitException("Failed to archive commits", e); } @@ -259,6 +259,16 @@ public class HoodieCommitArchiveLog { return archiveFilePath; } + private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception { + if (records.size() > 0) { + Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap(); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString()); + HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header); + this.writer = writer.appendBlock(block); + records.clear(); + } + } + private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieInstant hoodieInstant) throws IOException { HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();