nsivabalan commented on code in PR #8758: URL: https://github.com/apache/hudi/pull/8758#discussion_r1213939512
########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; + +import scala.Tuple2; + +/** + * A {@code BulkInsertPartitioner} implementation for Metadata Table to improve performance of initialization of metadata + * table partition when a very large number of records are inserted. + * + * This partitioner requires the records to be already tagged with location. + */ +public class SparkHoodieMetadataBulkInsertPartitioner implements BulkInsertPartitioner<JavaRDD<HoodieRecord>> { + final int numPartitions; + public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) { + this.numPartitions = numPartitions; + } + + private class FileGroupPartitioner extends Partitioner { + + @Override + public int getPartition(Object key) { + return ((Tuple2<Integer, String>)key)._1; + } + + @Override + public int numPartitions() { + return numPartitions; + } + } + + // FileIDs for the various partitions + private List<String> fileIDPfxs; + + /** + * Partition the records by their location. The number of partitions is determined by the number of MDT fileGroups being udpated rather than the + * specific value of outputSparkPartitions. + */ + @Override + public JavaRDD<HoodieRecord> repartitionRecords(JavaRDD<HoodieRecord> records, int outputSparkPartitions) { + Comparator<Tuple2<Integer, String>> keyComparator = + (Comparator<Tuple2<Integer, String>> & Serializable)(t1, t2) -> t1._2.compareTo(t2._2); + + // Partition the records by their file group + JavaRDD<HoodieRecord> partitionedRDD = records + // key by <file group index, record key>. The file group index is used to partition and the record key is used to sort within the partition. + .keyBy(r -> { + int fileGroupIndex = HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId()); + return new Tuple2<>(fileGroupIndex, r.getRecordKey()); + }) + .repartitionAndSortWithinPartitions(new FileGroupPartitioner(), keyComparator) + .map(t -> t._2); + + fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> { + // Due to partitioning, all record in the partition should have same fileID. So we only can get the fileID prefix from the first record. + List<String> fileIds = new ArrayList<>(1); + if (recordItr.hasNext()) { + HoodieRecord record = recordItr.next(); + final String fileID = HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId()); + fileIds.add(fileID); + } else { + // FileGroupPartitioner returns a fixed number of partition as part of numPartitions(). In the special case that recordsRDD has fewer + // records than fileGroupCount, some of these partitions (corresponding to fileGroups) will not have any data. + // But we still need to return a fileID for use within {@code BulkInsertMapFunction} + fileIds.add(""); Review Comment: lets use StringUtils.EMPTY_STRING ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java: ########## @@ -118,46 +123,32 @@ protected void initRegistry() { } @Override - protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext, - Option<T> actionMetadata, - Option<String> inflightInstantTimestamp) { - try { - metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> { - if (registry instanceof DistributedRegistry) { - HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext; - ((DistributedRegistry) registry).register(sparkEngineContext.getJavaSparkContext()); - } - }); + protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap) { + commitInternal(instantTime, partitionRecordsMap, Option.empty()); + } - if (enabled) { - initializeIfNeeded(dataMetaClient, actionMetadata, inflightInstantTimestamp); - } - } catch (IOException e) { - LOG.error("Failed to initialize metadata table. Disabling the writer.", e); - enabled = false; - } + protected void bulkCommit( + String instantTime, MetadataPartitionType partitionType, HoodieData<HoodieRecord> records, + int fileGroupCount) { + Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = new HashMap<>(); + partitionRecordsMap.put(partitionType, records); Review Comment: may be we can use Collections.singletonMap() ########## hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java: ########## @@ -234,6 +235,55 @@ public final class HoodieMetadataConfig extends HoodieConfig { .withDocumentation("When there is a pending instant in data table, this config limits the allowed number of deltacommits in metadata table to " + "prevent the metadata table's timeline from growing unboundedly as compaction won't be triggered due to the pending data table instant."); + public static final ConfigProperty<Boolean> RECORD_INDEX_CREATE_PROP = ConfigProperty Review Comment: can we try to follow same pattern as others. hoodie.metadata.record.index.enable ########## hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java: ########## @@ -234,6 +235,55 @@ public final class HoodieMetadataConfig extends HoodieConfig { .withDocumentation("When there is a pending instant in data table, this config limits the allowed number of deltacommits in metadata table to " + "prevent the metadata table's timeline from growing unboundedly as compaction won't be triggered due to the pending data table instant."); + public static final ConfigProperty<Boolean> RECORD_INDEX_CREATE_PROP = ConfigProperty + .key(METADATA_PREFIX + ".record.index.create") + .defaultValue(false) + .sinceVersion("0.14.0") + .withDocumentation("Create the HUDI Record Index within the Metadata Table"); + + public static final ConfigProperty<Integer> RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP = ConfigProperty + .key(METADATA_PREFIX + ".record.index.min.filegroup.count") + .defaultValue(10) + .sinceVersion("0.14.0") + .withDocumentation("Minimum number of file groups to use for Record Index."); + + public static final ConfigProperty<Integer> RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP = ConfigProperty + .key(METADATA_PREFIX + ".record.index.max.filegroup.count") + .defaultValue(1000) + .sinceVersion("0.14.0") + .withDocumentation("Maximum number of file groups to use for Record Index."); + + public static final ConfigProperty<Integer> RECORD_INDEX_MAX_FILE_GROUP_SIZE_BYTES_PROP = ConfigProperty + .key(METADATA_PREFIX + ".record.index.max.filegroup.size") + .defaultValue(1024 * 1024 * 1024) + .sinceVersion("0.14.0") + .withDocumentation("Maximum size in bytes of a single file group. Large file group takes longer to compact."); + + public static final ConfigProperty<Float> RECORD_INDEX_GROWTH_FACTOR_PROP = ConfigProperty + .key(METADATA_PREFIX + ".record.index.growth.factor") + .defaultValue(2.0f) + .sinceVersion("0.14.0") + .withDocumentation("The current number of records are multiplied by this number when estimating the number of " + + "file groups to create automatically. This helps account for growth in the number of records in the dataset."); + + public static final ConfigProperty<Long> MAX_READER_MEMORY_PROP = ConfigProperty + .key(METADATA_PREFIX + ".max.reader.memory") + .defaultValue(1024 * 1024 * 1024L) + .sinceVersion("0.14.0") + .withDocumentation("Max memory to use for the reader to read from metadata"); + + public static final ConfigProperty<Integer> MAX_READER_BUFFER_SIZE_PROP = ConfigProperty + .key(METADATA_PREFIX + ".max.reader.buffer.size") + .defaultValue(10 * 1024 * 1024) + .sinceVersion("0.14.0") + .withDocumentation("Max memory to use for the reader buffer while merging log blocks"); + + public static final ConfigProperty<String> SPILLABLE_MAP_DIR_PROP = ConfigProperty + .key(METADATA_PREFIX + ".spillable.dir") + .defaultValue(FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()) Review Comment: do you think we can add a infer function. infer from spillable dir set for data table and then prefix with METADATA_PREFIX. ########## hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java: ########## @@ -234,6 +235,55 @@ public final class HoodieMetadataConfig extends HoodieConfig { .withDocumentation("When there is a pending instant in data table, this config limits the allowed number of deltacommits in metadata table to " + "prevent the metadata table's timeline from growing unboundedly as compaction won't be triggered due to the pending data table instant."); + public static final ConfigProperty<Boolean> RECORD_INDEX_CREATE_PROP = ConfigProperty + .key(METADATA_PREFIX + ".record.index.create") + .defaultValue(false) + .sinceVersion("0.14.0") + .withDocumentation("Create the HUDI Record Index within the Metadata Table"); + + public static final ConfigProperty<Integer> RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP = ConfigProperty + .key(METADATA_PREFIX + ".record.index.min.filegroup.count") + .defaultValue(10) + .sinceVersion("0.14.0") + .withDocumentation("Minimum number of file groups to use for Record Index."); + + public static final ConfigProperty<Integer> RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP = ConfigProperty + .key(METADATA_PREFIX + ".record.index.max.filegroup.count") + .defaultValue(1000) + .sinceVersion("0.14.0") + .withDocumentation("Maximum number of file groups to use for Record Index."); + + public static final ConfigProperty<Integer> RECORD_INDEX_MAX_FILE_GROUP_SIZE_BYTES_PROP = ConfigProperty + .key(METADATA_PREFIX + ".record.index.max.filegroup.size") + .defaultValue(1024 * 1024 * 1024) Review Comment: probably we should make 120 Mb as default similar to our max parquet file size. ########## hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java: ########## @@ -545,6 +545,33 @@ public void close() { } } + @Override + public ClosableIterator<String> getRecordKeyIterator() { + final HFileScanner scanner = reader.getScanner(false, false); Review Comment: may I know why are we not caching? may be can can enable by calling below method ``` getHFileScanner(reader, true); ``` ########## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java: ########## @@ -694,17 +695,80 @@ private Long getTableChecksum() { return getLong(TABLE_CHECKSUM); } - public List<String> getMetadataPartitionsInflight() { - return StringUtils.split( - getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING), - CONFIG_VALUES_DELIMITER - ); + public Set<String> getMetadataPartitionsInflight() { + return new HashSet<>(StringUtils.split( + getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING), + CONFIG_VALUES_DELIMITER)); } public Set<String> getMetadataPartitions() { return new HashSet<>( - StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING), - CONFIG_VALUES_DELIMITER)); + StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING), + CONFIG_VALUES_DELIMITER)); + } + + /** + * @returns true if metadata table has been created and is being used for this dataset, else returns false. + */ + public boolean isMetadataTableEnabled() { + return isMetadataPartitionEnabled(MetadataPartitionType.FILES); + } + + /** + * Checks if metadata table is enabled and the specified partition has been initialized. + * + * @param partition The partition to check + * @returns true if the specific partition has been initialized, else returns false. + */ + public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) { + return getMetadataPartitions().contains(partition.getPartitionPath()); + } + + /** + * Enables or disables the specified metadata table partition. + * + * @param partitionType The partition + * @param enabled If true, the partition is enabled, else disabled + */ + public void setMetadataPartitionState(HoodieTableMetaClient metaClient, MetadataPartitionType partitionType, boolean enabled) { + ValidationUtils.checkArgument(!partitionType.getPartitionPath().contains(CONFIG_VALUES_DELIMITER), + "Metadata Table partition path cannot contain a comma: " + partitionType.getPartitionPath()); + Set<String> partitions = getMetadataPartitions(); + Set<String> partitionsInflight = getMetadataPartitionsInflight(); + if (enabled) { + partitions.add(partitionType.getPartitionPath()); + partitionsInflight.remove(partitionType.getPartitionPath()); + } else if (partitionType.equals(MetadataPartitionType.FILES)) { + // file listing partition is required for all other partitions to work + // Disabling file partition will also disable all partitions + partitions.clear(); + partitionsInflight.clear(); + } else { + partitions.remove(partitionType.getPartitionPath()); + partitionsInflight.remove(partitionType.getPartitionPath()); + } + setValue(TABLE_METADATA_PARTITIONS, partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); + setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); + update(metaClient.getFs(), new Path(metaClient.getMetaPath()), getProps()); + LOG.info(String.format("MDT %s partition %s has been %s", metaClient.getBasePathV2(), partitionType, enabled ? "enabled" : "disabled")); + } + + /** + * Enables the specified metadata table partition as inflight. + * + * @param partitionTypes The list of partitions to enable as inflight. + */ + public void setMetadataPartitionsInflight(HoodieTableMetaClient metaClient, List<MetadataPartitionType> partitionTypes) { Review Comment: do we have tests for these ? ########## hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java: ########## @@ -318,6 +368,38 @@ public int getMaxNumDeltacommitsWhenPending() { return getIntOrDefault(METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING); } + public boolean createRecordIndex() { Review Comment: lets fix the getter and setter names when you fix the variable naming. ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1378,6 +1340,254 @@ public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableC */ public static boolean isIndexingCommit(String instantTime) { return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + METADATA_INDEXER_TIME_SUFFIX.length() - && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX); + && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX); + } + + /** + * Delete the metadata table for the dataset and backup if required. + * + * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for which metadata table is to be deleted + * @param context instance of {@link HoodieEngineContext}. + * @param backup Whether metadata table should be backed up before deletion. If true, the table is backed up to the + * directory with name metadata_<current_timestamp>. + * @return The backup directory if backup was requested + */ + public static String deleteMetadataTable(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, boolean backup) { + final Path metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2()); + FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), context.getHadoopConf().get()); + dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, false); + try { + if (!fs.exists(metadataTablePath)) { + return null; + } + } catch (FileNotFoundException e) { + // Ignoring exception as metadata table already does not exist + return null; + } catch (IOException e) { + throw new HoodieMetadataException("Failed to check metadata table existence", e); + } + + if (backup) { + final Path metadataBackupPath = new Path(metadataTablePath.getParent(), ".metadata_" + HoodieActiveTimeline.createNewInstantTime()); + LOG.info("Backing up metadata directory to " + metadataBackupPath + " before deletion"); + try { + if (fs.rename(metadataTablePath, metadataBackupPath)) { + return metadataBackupPath.toString(); + } + } catch (Exception e) { + // If rename fails, we will ignore the backup and still delete the MDT + LOG.error("Failed to backup metadata table using rename", e); + } + } + + LOG.info("Deleting metadata table from " + metadataTablePath); + try { + fs.delete(metadataTablePath, true); + } catch (Exception e) { + throw new HoodieMetadataException("Failed to delete metadata table from path " + metadataTablePath, e); + } + + return null; + } + + /** + * Delete a partition within the metadata table. + * <p> + * This can be used to delete a partition so that it can be re-bootstrapped. + * + * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for which metadata table is to be deleted + * @param context instance of {@code HoodieEngineContext}. + * @param backup Whether metadata table should be backed up before deletion. If true, the table is backed up to the + * directory with name metadata_<current_timestamp>. + * @param partitionType The partition to delete + * @return The backup directory if backup was requested, null otherwise + */ + public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, Review Comment: do we have tests for these methods ? ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1378,6 +1340,254 @@ public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableC */ public static boolean isIndexingCommit(String instantTime) { return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + METADATA_INDEXER_TIME_SUFFIX.length() - && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX); + && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX); + } + + /** + * Delete the metadata table for the dataset and backup if required. + * + * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for which metadata table is to be deleted + * @param context instance of {@link HoodieEngineContext}. + * @param backup Whether metadata table should be backed up before deletion. If true, the table is backed up to the + * directory with name metadata_<current_timestamp>. + * @return The backup directory if backup was requested + */ + public static String deleteMetadataTable(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, boolean backup) { + final Path metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2()); + FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), context.getHadoopConf().get()); + dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, false); + try { + if (!fs.exists(metadataTablePath)) { + return null; + } + } catch (FileNotFoundException e) { + // Ignoring exception as metadata table already does not exist + return null; + } catch (IOException e) { + throw new HoodieMetadataException("Failed to check metadata table existence", e); + } + + if (backup) { + final Path metadataBackupPath = new Path(metadataTablePath.getParent(), ".metadata_" + HoodieActiveTimeline.createNewInstantTime()); + LOG.info("Backing up metadata directory to " + metadataBackupPath + " before deletion"); + try { + if (fs.rename(metadataTablePath, metadataBackupPath)) { + return metadataBackupPath.toString(); + } + } catch (Exception e) { + // If rename fails, we will ignore the backup and still delete the MDT + LOG.error("Failed to backup metadata table using rename", e); + } + } + + LOG.info("Deleting metadata table from " + metadataTablePath); + try { + fs.delete(metadataTablePath, true); + } catch (Exception e) { + throw new HoodieMetadataException("Failed to delete metadata table from path " + metadataTablePath, e); + } + + return null; + } + + /** + * Delete a partition within the metadata table. + * <p> + * This can be used to delete a partition so that it can be re-bootstrapped. + * + * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for which metadata table is to be deleted + * @param context instance of {@code HoodieEngineContext}. + * @param backup Whether metadata table should be backed up before deletion. If true, the table is backed up to the + * directory with name metadata_<current_timestamp>. + * @param partitionType The partition to delete + * @return The backup directory if backup was requested, null otherwise + */ + public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, + MetadataPartitionType partitionType, boolean backup) { + if (partitionType.equals(MetadataPartitionType.FILES)) { + return deleteMetadataTable(dataMetaClient, context, backup); + } + + final Path metadataTablePartitionPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()), partitionType.getPartitionPath()); + FileSystem fs = FSUtils.getFs(metadataTablePartitionPath.toString(), context.getHadoopConf().get()); + dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, false); + try { + if (!fs.exists(metadataTablePartitionPath)) { + return null; + } + } catch (FileNotFoundException e) { + // Ignoring exception as metadata table already does not exist + LOG.debug("Metadata table partition " + partitionType + " not found at path " + metadataTablePartitionPath); + return null; + } catch (Exception e) { + throw new HoodieMetadataException(String.format("Failed to check existence of MDT partition %s at path %s: ", partitionType, metadataTablePartitionPath), e); + } + + if (backup) { + final Path metadataPartitionBackupPath = new Path(metadataTablePartitionPath.getParent().getParent(), + String.format(".metadata_%s_%s", partitionType.getPartitionPath(), HoodieActiveTimeline.createNewInstantTime())); + LOG.info(String.format("Backing up MDT partition %s to %s before deletion", partitionType, metadataPartitionBackupPath)); + try { + if (fs.rename(metadataTablePartitionPath, metadataPartitionBackupPath)) { + return metadataPartitionBackupPath.toString(); + } + } catch (Exception e) { + // If rename fails, we will try to delete the table instead + LOG.error(String.format("Failed to backup MDT partition %s using rename", partitionType), e); + } + } else { + LOG.info("Deleting metadata table partition from " + metadataTablePartitionPath); + try { + fs.delete(metadataTablePartitionPath, true); + } catch (Exception e) { + throw new HoodieMetadataException("Failed to delete metadata table partition from path " + metadataTablePartitionPath, e); + } + } + + return null; + } + + /** + * Return the complete fileID for a file group within a MDT partition. + * <p> + * MDT fileGroups have the format <fileIDPrefix>-<index>. The fileIDPrefix is hardcoded for each MDT partition and index is an integer. + * + * @param partitionType The type of the MDT partition + * @param index Index of the file group within the partition + * @return The fileID + */ + public static String getFileIDForFileGroup(MetadataPartitionType partitionType, int index) { + return String.format("%s%04d", partitionType.getFileIdPrefix(), index); + } + + /** + * Extract the index from the fileID of a file group in the MDT partition. See {@code getFileIDForFileGroup} for the format of the fileID. + * + * @param fileId fileID of a file group. + * @return The index of file group + */ + public static int getFileGroupIndexFromFileId(String fileId) { + final int endIndex = getFileIdLengthWithoutFileIndex(fileId); + final int fromIndex = fileId.lastIndexOf("-", endIndex - 1); + return Integer.parseInt(fileId.substring(fromIndex + 1, endIndex)); + } + + /** + * Extract the fileID prefix from the fileID of a file group in the MDT partition. See {@code getFileIDForFileGroup} for the format of the fileID. + * + * @param fileId fileID of a file group. + * @return The fileID without the file index + */ + public static String getFileGroupPrefix(String fileId) { + return fileId.substring(0, getFileIdLengthWithoutFileIndex(fileId)); + } + + /** + * Returns the length of the fileID ignoring the fileIndex suffix + * <p> + * 0.10 version MDT code added -0 (0th fileIndex) to the fileID. This was removed later. + * <p> + * Examples: + * 0.11+ version: fileID: files-0000 returns 10 + * 0.10 version: fileID: files-0000-0 returns 10 + * + * @param fileId The fileID + * @return The length of the fileID ignoring the fileIndex suffix + */ + private static int getFileIdLengthWithoutFileIndex(String fileId) { + return fileId.endsWith("-0") ? fileId.length() - 2 : fileId.length(); + } + + /** + * Create the timestamp for a clean operation on the metadata table. + */ + public static String createCleanTimestamp(String timestamp) { + return timestamp + CLEAN_TIMESTAMP_SUFFIX; + } + + /** + * Create the timestamp for a compaction operation on the metadata table. + */ + public static String createCompactionTimestamp(String timestamp) { + return timestamp + COMPACTION_TIMESTAMP_SUFFIX; + } + + /** + * Create the timestamp for an index initialization operation on the metadata table. + * <p> + * Since many MDT partitions can be initialized one after other the offset parameter controls generating a + * unique timestamp. + */ + public static String createIndexInitTimestamp(String timestamp, int offset) { + return String.format("%s%03d", timestamp, PARTITION_INITIALIZATION_TIME_SUFFIX + offset); + } + + /** + * Estimates the file group count to use for a MDT partition. + * + * @param partitionType Type of the partition for which the file group count is to be estimated. + * @param recordCount The number of records expected to be written. + * @param averageRecordSize Average size of each record to be writen. + * @param minFileGroupCount Minimum number of file groups to use. + * @param maxFileGroupCount Maximum number of file groups to use. + * @param growthFactor By what factor are the records (recordCount) expected to grow? + * @param maxFileGroupSizeBytes Maximum size of the file group. + * @return The estimated number of file groups. + */ + public static int estimateFileGroupCount(MetadataPartitionType partitionType, long recordCount, int averageRecordSize, int minFileGroupCount, + int maxFileGroupCount, float growthFactor, int maxFileGroupSizeBytes) { + int fileGroupCount; + + // If a fixed number of file groups are desired + if ((minFileGroupCount == maxFileGroupCount) && (minFileGroupCount != 0)) { + fileGroupCount = minFileGroupCount; + } else { + // Number of records to estimate for + final long expectedNumRecords = (long) Math.ceil((float) recordCount * growthFactor); + // Maximum records that should be written to each file group so that it does not go over the size limit required + final long maxRecordsPerFileGroup = maxFileGroupSizeBytes / Math.max(averageRecordSize, 1L); + final long estimatedFileGroupCount = expectedNumRecords / maxRecordsPerFileGroup; + + if (estimatedFileGroupCount >= maxFileGroupCount) { + fileGroupCount = maxFileGroupCount; + } else if (estimatedFileGroupCount <= minFileGroupCount) { + fileGroupCount = minFileGroupCount; + } else { + fileGroupCount = Math.max(1, (int) estimatedFileGroupCount); + } + } + + LOG.info(String.format("Estimated file group count for MDT partition %s is %d " + + "[recordCount=%d, avgRecordSize=%d, minFileGroupCount=%d, maxFileGroupCount=%d, growthFactor=%f, " + + "maxFileGroupSizeBytes=%d]", partitionType, fileGroupCount, recordCount, averageRecordSize, minFileGroupCount, + maxFileGroupCount, growthFactor, maxFileGroupSizeBytes)); + return fileGroupCount; + } + + /** + * Returns true if any enabled metadata partition in the given hoodie table requires WriteStatus to track the written records. + * + * @param config MDT config + * @param metaClient {@code HoodieTableMetaClient} of the data table + * @return true if WriteStatus should track the written records else false. + */ + public static boolean needsWriteStatusTracking(HoodieMetadataConfig config, HoodieTableMetaClient metaClient) { Review Comment: suggested renaming for a similar method elsewhere. when you are fixing that, feel free to fix this as well ########## hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java: ########## @@ -67,12 +57,20 @@ public static List<String> allPaths() { ); } + /** + * Returns the list of metadata table partitions which require WriteStatus to track written records. + * <p> + * These partitions need the list of written records so that they can update their metadata. + */ + public static List<MetadataPartitionType> needWriteStatusTracking() { Review Comment: getMetadataPartitionsNeedingWriteStatusTracking -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
