nsivabalan commented on code in PR #8758:
URL: https://github.com/apache/hudi/pull/8758#discussion_r1213814148
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -111,18 +111,27 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
public static final String METADATA_COMPACTION_TIME_SUFFIX = "001";
+ // Virtual keys support for metadata table. This Field is
+ // from the metadata payload schema.
+ private static final String RECORD_KEY_FIELD_NAME =
HoodieMetadataPayload.KEY_FIELD_NAME;
+
+ // Average size of a record saved within the record index.
+ // Record index has a fixed size schema. This has been calculated based on
experiments with default settings
+ // for block size (4MB), compression (GZ) and disabling the hudi metadata
fields.
Review Comment:
default hfile block size on write in OSS is 1MB. Do we need to fix that?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -370,11 +336,10 @@ private <T extends SpecificRecordBase> boolean
isBootstrapNeeded(Option<HoodieIn
}
// Detect the commit gaps if any from the data and the metadata active
timeline
- if
(dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(
- latestMetadataInstant.get().getTimestamp())
+ if
(dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(latestMetadataInstantTimestamp)
&& !isCommitRevertedByInFlightAction(actionMetadata,
latestMetadataInstantTimestamp)) {
LOG.error("Metadata Table will need to be re-initialized as un-synced
instants have been archived."
- + " latestMetadataInstant=" +
latestMetadataInstant.get().getTimestamp()
+ + " latestMetadataInstant=" + latestMetadataInstantTimestamp
Review Comment:
Is 339 to 344 valid scenario w/ sync design? may be it was written when
async design was in place.
Am looking to simplify these so that its maintainable.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -370,11 +336,10 @@ private <T extends SpecificRecordBase> boolean
isBootstrapNeeded(Option<HoodieIn
}
// Detect the commit gaps if any from the data and the metadata active
timeline
- if
(dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(
- latestMetadataInstant.get().getTimestamp())
+ if
(dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(latestMetadataInstantTimestamp)
Review Comment:
L329 is mis-leading. even for a fresh table, we are logging
```
Metadata Table will need to be re-initialized as no instants were found
```
can we fix that
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean
isCommitRevertedByInFlightAction(
/**
* Initialize the Metadata Table by listing files and partitions from the
file system.
*
- * @param dataMetaClient - {@code HoodieTableMetaClient} for the
dataset.
+ * @param initializationTime - Timestamp to use for the commit
+ * @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for
this initialization
*/
- private boolean initializeFromFilesystem(HoodieTableMetaClient
dataMetaClient,
+ private boolean initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit,
Option<String>
inflightInstantTimestamp) throws IOException {
if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
return false;
}
- String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
- initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
- initTableMetadata();
- // if async metadata indexing is enabled,
- // then only initialize files partition as other partitions will be built
using HoodieIndexer
- List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>();
- if (dataWriteConfig.isMetadataAsyncIndex()) {
- enabledPartitionTypes.add(MetadataPartitionType.FILES);
- } else {
- // all enabled ones should be initialized
- enabledPartitionTypes = this.enabledPartitionTypes;
+ // FILES partition is always initialized first
+
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+ || partitionsToInit.get(0).equals(MetadataPartitionType.FILES),
"FILES partition should be initialized first: " + partitionsToInit);
+
+ metadataMetaClient = initializeMetaClient();
+
+ // Get a complete list of files and partitions from the file system or
from already initialized FILES partition of MDT
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+ List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ?
listAllPartitionsFromMDT(initializationTime) :
listAllPartitionsFromFilesystem(initializationTime);
+ Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
+ .map(p -> {
+ String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+ return Pair.of(partitionName, p.getFileNameToSizeMap());
+ })
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ for (MetadataPartitionType partitionType : partitionsToInit) {
+ // Find the commit timestamp to use for this partition. Each
initialization should use its own unique commit time.
+ String commitTimeForPartition =
generateUniqueCommitInstantTime(initializationTime);
+
+ LOG.info("Initializing MDT partition " + partitionType + " at instant "
+ commitTimeForPartition);
+
+ Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+ switch (partitionType) {
+ case FILES:
+ fileGroupCountAndRecordsPair =
initializeFilesPartition(initializationTime, partitionInfoList);
+ break;
+ case BLOOM_FILTERS:
+ fileGroupCountAndRecordsPair =
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+ break;
+ case COLUMN_STATS:
+ fileGroupCountAndRecordsPair =
initializeColumnStatsPartition(partitionToFilesMap);
+ break;
+ case RECORD_INDEX:
+ fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+ break;
+ default:
+ throw new HoodieMetadataException("Unsupported MDT partition type: "
+ partitionType);
+ }
+
+ // Generate the file groups
+ final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+ ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for
MDT partition " + partitionType + " should be > 0");
+ initializeFileGroups(dataMetaClient, partitionType,
commitTimeForPartition, fileGroupCount);
+
+ // Perform the commit using bulkCommit
+ HoodieData<HoodieRecord> records =
fileGroupCountAndRecordsPair.getValue();
+ bulkCommit(commitTimeForPartition, partitionType, records,
fileGroupCount);
+ metadataMetaClient.reloadActiveTimeline();
+
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
partitionType, true);
+ initMetadataReader();
}
- initializeEnabledFileGroups(dataMetaClient, createInstantTime,
enabledPartitionTypes);
- initialCommit(createInstantTime, enabledPartitionTypes);
- updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
return true;
}
- private String getInitialCommitInstantTime(HoodieTableMetaClient
dataMetaClient) {
- // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP
as the instant time for initial commit
- // Otherwise, we use the timestamp of the latest completed action.
- String createInstantTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
- LOG.info("Creating a new metadata table in " +
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
- return createInstantTime;
+ /**
+ * Returns a unique timestamp to use for initializing a MDT partition.
+ * <p>
+ * Since commits are immutable, we should use unique timestamps to
initialize each partition. For this, we will add a suffix to the given
initializationTime
+ * until we find a unique timestamp.
+ *
+ * @param initializationTime Timestamp from dataset to use for initialization
+ * @return a unique timestamp for MDT
+ */
+ private String generateUniqueCommitInstantTime(String initializationTime) {
+ // Add suffix to initializationTime to find an unused instant time for the
next index initialization.
+ // This function would be called multiple times in a single application if
multiple indexes are being
+ // initialized one after the other.
+ for (int offset = 0; ; ++offset) {
+ final String commitInstantTime =
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+ if
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+ return commitInstantTime;
+ }
+ }
}
- private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient,
Option<String> inflightInstantTimestamp) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be initialized
as it is not enabled");
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeColumnStatsPartition(Map<String, Map<String, Long>>
partitionToFilesMap) {
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams());
+
+ final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+ return Pair.of(fileGroupCount, records);
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeBloomFiltersPartition(String createInstantTime, Map<String,
Map<String, Long>> partitionToFilesMap) {
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams(), createInstantTime);
+
+ final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+ return Pair.of(fileGroupCount, records);
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition() throws IOException {
+ // Open the MDT reader to create a file system view
+ initMetadataReader();
+ final HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
+ dataMetaClient.getActiveTimeline(), metadata);
+
+ // Collect the list of latest base files present in each partition
+ List<String> partitions = metadata.getAllPartitionPaths();
+ final List<Pair<String, String>> partitionBaseFilePairs = new
ArrayList<>();
+ for (String partition : partitions) {
+ partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+ .map(basefile -> Pair.of(partition,
basefile.getFileName())).collect(Collectors.toList()));
+ }
+ LOG.info("Initializing record index from " + partitionBaseFilePairs.size()
+ " base files in "
+ + partitions.size() + " partitions");
+
+ // Collect record keys from the files in parallel
+ HoodieData<HoodieRecord> records =
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+ records.persist("MEMORY_AND_DISK_SER");
+ final long recordCount = records.count();
+
+ // Initialize the file groups
+ final int fileGroupCount =
HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX,
recordCount,
+ RECORD_INDEX_AVERAGE_RECORD_SIZE,
dataWriteConfig.getRecordIndexMinFileGroupCount(),
+ dataWriteConfig.getRecordIndexMaxFileGroupCount(),
dataWriteConfig.getRecordIndexGrowthFactor(),
+ dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
+
+ LOG.info(String.format("Initializing record index with %d mappings and %d
file groups.", recordCount, fileGroupCount));
+ return Pair.of(fileGroupCount, records);
+ }
+
+ /**
+ * Read the record keys from base files in partitions and return records.
+ */
+ private HoodieData<HoodieRecord>
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
+ List<Pair<String, String>> partitionBaseFilePairs) {
+ if (partitionBaseFilePairs.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index:
reading record keys from base files");
+ return engineContext.parallelize(partitionBaseFilePairs,
partitionBaseFilePairs.size()).flatMap(p -> {
+ final String partition = p.getKey();
+ final String filename = p.getValue();
+ Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition +
Path.SEPARATOR + filename);
+
+ final String fileId = FSUtils.getFileId(filename);
+ final String instantTime = FSUtils.getCommitTime(filename);
+ HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(),
dataFilePath);
Review Comment:
can we use try w/ resource, so that the reader auto closes
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -933,52 +1041,80 @@ protected HoodieData<HoodieRecord>
prepRecords(Map<MetadataPartitionType,
}
/**
- * Perform a compaction on the Metadata Table.
- *
- * Cases to be handled:
- * 1. We cannot perform compaction if there are previous inflight
operations on the dataset. This is because
- * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
- *
- * 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
- * deltacommit.
+ * Optimize the metadata table by running compaction, clean and archive as
required.
+ * <p>
+ * Don't perform optimization if there are inflight operations on the
dataset. This is for two reasons:
+ * - The compaction will contain the correct data as all failed operations
have been rolled back.
+ * - Clean/compaction etc. will have the highest timestamp on the MDT and we
won't be adding new operations
+ * with smaller timestamps to metadata table (makes for easier debugging)
+ * <p>
+ * This adds the limitations that long-running async operations (clustering,
etc.) may cause delay in such MDT
+ * optimizations. We will relax this after MDT code has been hardened.
*/
- protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String
instantTime) {
- // finish off any pending compactions if any from previous attempt.
- writeClient.runAnyPendingCompactions();
-
- String latestDeltaCommitTimeInMetadataTable =
metadataMetaClient.reloadActiveTimeline()
- .getDeltaCommitTimeline()
- .filterCompletedInstants()
- .lastInstant().orElseThrow(() -> new HoodieMetadataException("No
completed deltacommit in metadata table"))
- .getTimestamp();
- // we need to find if there are any inflights in data table timeline
before or equal to the latest delta commit in metadata table.
- // Whenever you want to change this logic, please ensure all below
scenarios are considered.
- // a. There could be a chance that latest delta commit in MDT is committed
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
- // b. There could be DT inflights after latest delta commit in MDT and we
are ok with it. bcoz, the contract is, latest compaction instant time in MDT
represents
- // any instants before that is already synced with metadata table.
- // c. Do consider out of order commits. For eg, c4 from DT could complete
before c3. and we can't trigger compaction in MDT with c4 as base instant time,
until every
- // instant before c4 is synced with metadata table.
- List<HoodieInstant> pendingInstants =
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
-
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+ @Override
+ public void performTableServices(Option<String> inFlightInstantTimestamp) {
+ HoodieTimer metadataTableServicesTimer = HoodieTimer.start();
+ boolean allTableServicesExecutedSuccessfullyOrSkipped = true;
+ try {
+ BaseHoodieWriteClient writeClient = getWriteClient();
+ // Run any pending table services operations.
+ runPendingTableServicesOperations(writeClient);
+
+ // Check and run clean operations.
+ String latestDeltacommitTime =
metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline()
+ .filterCompletedInstants()
+ .lastInstant().get()
+ .getTimestamp();
+ LOG.info("Latest deltacommit time found is " + latestDeltacommitTime +
", running clean operations.");
Review Comment:
may I know why we flipped the order.
previously we were doing compaction and then clean.
here we are doing clean and then compaction.
any specific reason. Can we add java docs rgnd any such non-trivial rational
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -794,25 +882,31 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
}
partitionTypes.add(partitionType);
});
- // before initial commit update inflight indexes in table config
- Set<String> inflightIndexes =
getInflightMetadataPartitions(dataMetaClient.getTableConfig());
-
inflightIndexes.addAll(indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
-
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(),
String.join(",", inflightIndexes));
- HoodieTableConfig.update(dataMetaClient.getFs(), new
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
- initialCommit(indexUptoInstantTime + METADATA_INDEXER_TIME_SUFFIX,
partitionTypes);
+
+ // before initialization set these partitions as inflight in table config
+
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient,
partitionTypes);
+
+ // initialize partitions
+ initializeFromFilesystem(indexUptoInstantTime +
METADATA_INDEXER_TIME_SUFFIX, partitionTypes, Option.empty());
}
/**
* Update from {@code HoodieCommitMetadata}.
*
* @param commitMetadata {@code HoodieCommitMetadata}
- * @param instantTime Timestamp at which the commit was performed
- * @param isTableServiceAction {@code true} if commit metadata is pertaining
to a table service. {@code false} otherwise.
+ * @param instantTime Timestamp at which the commit was performed
*/
@Override
- public void update(HoodieCommitMetadata commitMetadata, String instantTime,
boolean isTableServiceAction) {
- processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(
- engineContext, commitMetadata, instantTime,
getRecordsGenerationParams()), !isTableServiceAction);
+ public void update(HoodieCommitMetadata commitMetadata,
HoodieData<WriteStatus> writeStatus, String instantTime) {
+ processAndCommit(instantTime, () -> {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap =
+ HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
commitMetadata, instantTime, getRecordsGenerationParams());
+
+ // Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
Review Comment:
Lets also make WriteStats lean. I don't think we need to hold entire
HoodieRecord in there. we just need record key, partition path. and location
for success records. we should fix it as a follow up path.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1000,87 +1136,78 @@ protected void cleanIfNecessary(BaseHoodieWriteClient
writeClient, String instan
// Trigger cleaning with suffixes based on the same instant time. This
ensures that any future
// delta commits synced over will not have an instant time lesser than the
last completed instant on the
// metadata table.
- writeClient.clean(instantTime + "002");
+
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
writeClient.lazyRollbackFailedIndexing();
}
/**
- * This is invoked to initialize metadata table for a dataset.
- * Initial commit has special handling mechanism due to its scale compared
to other regular commits.
- * During cold startup, the list of files to be committed can be huge.
- * So creating a HoodieCommitMetadata out of these large number of files,
- * and calling the existing update(HoodieCommitMetadata) function does not
scale well.
- * Hence, we have a special commit just for the initialization scenario.
+ * Validates the timeline for both main and metadata tables to ensure
compaction on MDT can be scheduled.
*/
- private void initialCommit(String createInstantTime,
List<MetadataPartitionType> partitionTypes) {
- // List all partitions in the basePath of the containing dataset
- LOG.info("Initializing metadata table by using file listings in " +
dataWriteConfig.getBasePath());
- engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing
metadata table by listing files and partitions: " +
dataWriteConfig.getTableName());
-
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap
= new HashMap<>();
-
- // skip file system listing to populate metadata records if it's a fresh
table.
- // this is applicable only if the table already has N commits and metadata
is enabled at a later point in time.
- if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { //
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
- // If not, last completed commit in data table will be chosen as the
initial commit time.
- LOG.info("Triggering empty Commit to metadata to initialize");
- } else {
- List<DirectoryInfo> partitionInfoList =
listAllPartitions(dataMetaClient);
- Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
- .map(p -> {
- String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
- return Pair.of(partitionName, p.getFileNameToSizeMap());
- })
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
- int totalDataFilesCount =
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
- List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
- if (partitionTypes.contains(MetadataPartitionType.FILES)) {
- // Record which saves the list of all partitions
- HoodieRecord allPartitionRecord =
HoodieMetadataPayload.createPartitionListRecord(partitions);
- HoodieData<HoodieRecord> filesPartitionRecords =
getFilesPartitionRecords(createInstantTime, partitionInfoList,
allPartitionRecord);
- ValidationUtils.checkState(filesPartitionRecords.count() ==
(partitions.size() + 1));
- partitionToRecordsMap.put(MetadataPartitionType.FILES,
filesPartitionRecords);
- }
-
- if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) &&
totalDataFilesCount > 0) {
- final HoodieData<HoodieRecord> recordsRDD =
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
- engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams(), createInstantTime);
- partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS,
recordsRDD);
- }
+ private boolean validateTimelineBeforeSchedulingCompaction(Option<String>
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
Review Comment:
is there any chance in this logic, or its the same as before ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -147,84 +156,55 @@ protected <T extends SpecificRecordBase>
HoodieBackedTableMetadataWriter(Configu
this.metrics = Option.empty();
this.enabledPartitionTypes = new ArrayList<>();
- if (writeConfig.isMetadataTableEnabled()) {
- this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
- this.metadataWriteConfig = createMetadataWriteConfig(writeConfig,
failedWritesCleaningPolicy);
- enabled = true;
-
- // Inline compaction and auto clean is required as we do not expose this
table outside
- ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(),
- "Cleaning is controlled internally for Metadata table.");
-
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(),
- "Compaction is controlled internally for metadata table.");
- // Auto commit is required
-
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(),
- "Auto commit is required for Metadata Table");
-
ValidationUtils.checkArgument(this.metadataWriteConfig.getWriteStatusClassName().equals(FailOnFirstErrorWriteStatus.class.getName()),
- "MDT should use " + FailOnFirstErrorWriteStatus.class.getName());
- // Metadata Table cannot have metadata listing turned on. (infinite
loop, much?)
-
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(),
- "File listing cannot be used for Metadata Table");
-
- this.dataMetaClient =
-
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
- enablePartitions();
- initRegistry();
- initialize(engineContext, actionMetadata, inflightInstantTimestamp);
- initTableMetadata();
- } else {
- enabled = false;
+ this.dataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
+
+ if (dataMetaClient.getTableConfig().isMetadataTableEnabled() ||
writeConfig.isMetadataTableEnabled()) {
+ this.metadataWriteConfig =
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig,
failedWritesCleaningPolicy);
+
+ try {
+ enablePartitions();
+ initRegistry();
+
+ initialized = initializeIfNeeded(dataMetaClient, actionMetadata,
inflightInstantTimestamp);
+
+ } catch (IOException e) {
+ LOG.error("Failed to initialize MDT", e);
Review Comment:
lets not use shortcuts in log statements.
MDT -> metadata table
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -241,105 +221,91 @@ public List<MetadataPartitionType>
getEnabledPartitionTypes() {
return this.enabledPartitionTypes;
}
- /**
- * Initialize the metadata table if it does not exist.
- * <p>
- * If the metadata table does not exist, then file and partition listing is
used to initialize the table.
- *
- * @param engineContext
- * @param actionMetadata Action metadata types extending Avro
generated SpecificRecordBase
- * @param inflightInstantTimestamp Timestamp of an instant in progress on
the dataset. This instant is ignored
- * while deciding to initialize the metadata
table.
- */
- protected abstract <T extends SpecificRecordBase> void
initialize(HoodieEngineContext engineContext,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp);
-
- public void initTableMetadata() {
- try {
- if (this.metadata != null) {
- this.metadata.close();
- }
- this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(),
- dataWriteConfig.getBasePath(),
dataWriteConfig.getSpillableMapBasePath());
- this.metadataMetaClient = metadata.getMetadataMetaClient();
- } catch (Exception e) {
- throw new HoodieException("Error initializing metadata table for reads",
e);
- }
- }
-
/**
* Initialize the metadata table if needed.
*
* @param dataMetaClient - meta client for the data table
* @param actionMetadata - optional action metadata
* @param inflightInstantTimestamp - timestamp of an instant in progress on
the dataset
* @param <T> - action metadata types extending Avro
generated SpecificRecordBase
- * @throws IOException
+ * @throws IOException on errors
*/
- protected <T extends SpecificRecordBase> void
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp) throws IOException {
+ protected <T extends SpecificRecordBase> boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+
Option<T> actionMetadata,
+
Option<String> inflightInstantTimestamp) throws IOException {
HoodieTimer timer = HoodieTimer.start();
+ List<MetadataPartitionType> partitionsToInit = new
ArrayList<>(MetadataPartitionType.values().length);
- boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ try {
+ boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ if (!exists) {
+ // FILES partition is always required
+ partitionsToInit.add(MetadataPartitionType.FILES);
+ }
- if (!exists) {
- // Initialize for the first time by listing partitions and files
directly from the file system
- if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+ // check if any of the enabled partition types needs to be initialized
+ // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
+ if (!dataWriteConfig.isMetadataAsyncIndex()) {
+ Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+ LOG.info("Async metadata indexing disabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
+ this.enabledPartitionTypes.stream()
+ .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
+ .forEach(partitionsToInit::add);
}
- return;
- }
- // if metadata table exists, then check if any of the enabled partition
types needs to be initialized
- // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
- if (!dataWriteConfig.isMetadataAsyncIndex()) {
- Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
- LOG.info("Async metadata indexing enabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
- List<MetadataPartitionType> partitionsToInit =
this.enabledPartitionTypes.stream()
- .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
- .collect(Collectors.toList());
- // if there are no partitions to initialize or there is a pending
operation, then don't initialize in this round
- if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient,
inflightInstantTimestamp)) {
- return;
+ if (partitionsToInit.isEmpty()) {
+ // No partitions to initialize
+ initMetadataReader();
+ return true;
+ }
+
+ // If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+ // Otherwise, we use the timestamp of the latest completed action.
+ String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
Review Comment:
does the solo commit time instantiation work well w/ RLI commit time parsing
while reading records from RLI ? I remember seeing warn msgs in some local
testing.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean
isCommitRevertedByInFlightAction(
/**
* Initialize the Metadata Table by listing files and partitions from the
file system.
*
- * @param dataMetaClient - {@code HoodieTableMetaClient} for the
dataset.
+ * @param initializationTime - Timestamp to use for the commit
+ * @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for
this initialization
*/
- private boolean initializeFromFilesystem(HoodieTableMetaClient
dataMetaClient,
+ private boolean initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit,
Option<String>
inflightInstantTimestamp) throws IOException {
if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
return false;
}
- String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
- initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
- initTableMetadata();
- // if async metadata indexing is enabled,
- // then only initialize files partition as other partitions will be built
using HoodieIndexer
- List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>();
- if (dataWriteConfig.isMetadataAsyncIndex()) {
- enabledPartitionTypes.add(MetadataPartitionType.FILES);
- } else {
- // all enabled ones should be initialized
- enabledPartitionTypes = this.enabledPartitionTypes;
+ // FILES partition is always initialized first
+
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+ || partitionsToInit.get(0).equals(MetadataPartitionType.FILES),
"FILES partition should be initialized first: " + partitionsToInit);
+
+ metadataMetaClient = initializeMetaClient();
+
+ // Get a complete list of files and partitions from the file system or
from already initialized FILES partition of MDT
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+ List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ?
listAllPartitionsFromMDT(initializationTime) :
listAllPartitionsFromFilesystem(initializationTime);
+ Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
+ .map(p -> {
+ String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+ return Pair.of(partitionName, p.getFileNameToSizeMap());
+ })
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ for (MetadataPartitionType partitionType : partitionsToInit) {
+ // Find the commit timestamp to use for this partition. Each
initialization should use its own unique commit time.
+ String commitTimeForPartition =
generateUniqueCommitInstantTime(initializationTime);
+
+ LOG.info("Initializing MDT partition " + partitionType + " at instant "
+ commitTimeForPartition);
+
+ Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+ switch (partitionType) {
+ case FILES:
+ fileGroupCountAndRecordsPair =
initializeFilesPartition(initializationTime, partitionInfoList);
+ break;
+ case BLOOM_FILTERS:
+ fileGroupCountAndRecordsPair =
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+ break;
+ case COLUMN_STATS:
+ fileGroupCountAndRecordsPair =
initializeColumnStatsPartition(partitionToFilesMap);
+ break;
+ case RECORD_INDEX:
+ fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+ break;
+ default:
+ throw new HoodieMetadataException("Unsupported MDT partition type: "
+ partitionType);
+ }
+
+ // Generate the file groups
+ final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+ ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for
MDT partition " + partitionType + " should be > 0");
+ initializeFileGroups(dataMetaClient, partitionType,
commitTimeForPartition, fileGroupCount);
+
+ // Perform the commit using bulkCommit
+ HoodieData<HoodieRecord> records =
fileGroupCountAndRecordsPair.getValue();
+ bulkCommit(commitTimeForPartition, partitionType, records,
fileGroupCount);
+ metadataMetaClient.reloadActiveTimeline();
+
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
partitionType, true);
+ initMetadataReader();
}
- initializeEnabledFileGroups(dataMetaClient, createInstantTime,
enabledPartitionTypes);
- initialCommit(createInstantTime, enabledPartitionTypes);
- updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
return true;
}
- private String getInitialCommitInstantTime(HoodieTableMetaClient
dataMetaClient) {
- // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP
as the instant time for initial commit
- // Otherwise, we use the timestamp of the latest completed action.
- String createInstantTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
- LOG.info("Creating a new metadata table in " +
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
- return createInstantTime;
+ /**
+ * Returns a unique timestamp to use for initializing a MDT partition.
+ * <p>
+ * Since commits are immutable, we should use unique timestamps to
initialize each partition. For this, we will add a suffix to the given
initializationTime
+ * until we find a unique timestamp.
+ *
+ * @param initializationTime Timestamp from dataset to use for initialization
+ * @return a unique timestamp for MDT
+ */
+ private String generateUniqueCommitInstantTime(String initializationTime) {
+ // Add suffix to initializationTime to find an unused instant time for the
next index initialization.
+ // This function would be called multiple times in a single application if
multiple indexes are being
+ // initialized one after the other.
+ for (int offset = 0; ; ++offset) {
+ final String commitInstantTime =
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+ if
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+ return commitInstantTime;
+ }
+ }
}
- private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient,
Option<String> inflightInstantTimestamp) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be initialized
as it is not enabled");
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeColumnStatsPartition(Map<String, Map<String, Long>>
partitionToFilesMap) {
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams());
+
+ final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+ return Pair.of(fileGroupCount, records);
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeBloomFiltersPartition(String createInstantTime, Map<String,
Map<String, Long>> partitionToFilesMap) {
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams(), createInstantTime);
+
+ final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+ return Pair.of(fileGroupCount, records);
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition() throws IOException {
+ // Open the MDT reader to create a file system view
+ initMetadataReader();
+ final HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
+ dataMetaClient.getActiveTimeline(), metadata);
+
+ // Collect the list of latest base files present in each partition
+ List<String> partitions = metadata.getAllPartitionPaths();
+ final List<Pair<String, String>> partitionBaseFilePairs = new
ArrayList<>();
+ for (String partition : partitions) {
+ partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+ .map(basefile -> Pair.of(partition,
basefile.getFileName())).collect(Collectors.toList()));
+ }
+ LOG.info("Initializing record index from " + partitionBaseFilePairs.size()
+ " base files in "
+ + partitions.size() + " partitions");
+
+ // Collect record keys from the files in parallel
+ HoodieData<HoodieRecord> records =
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+ records.persist("MEMORY_AND_DISK_SER");
+ final long recordCount = records.count();
+
+ // Initialize the file groups
+ final int fileGroupCount =
HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX,
recordCount,
+ RECORD_INDEX_AVERAGE_RECORD_SIZE,
dataWriteConfig.getRecordIndexMinFileGroupCount(),
+ dataWriteConfig.getRecordIndexMaxFileGroupCount(),
dataWriteConfig.getRecordIndexGrowthFactor(),
+ dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
+
+ LOG.info(String.format("Initializing record index with %d mappings and %d
file groups.", recordCount, fileGroupCount));
+ return Pair.of(fileGroupCount, records);
+ }
+
+ /**
+ * Read the record keys from base files in partitions and return records.
+ */
+ private HoodieData<HoodieRecord>
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
+ List<Pair<String, String>> partitionBaseFilePairs) {
+ if (partitionBaseFilePairs.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index:
reading record keys from base files");
+ return engineContext.parallelize(partitionBaseFilePairs,
partitionBaseFilePairs.size()).flatMap(p -> {
+ final String partition = p.getKey();
+ final String filename = p.getValue();
+ Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition +
Path.SEPARATOR + filename);
+
+ final String fileId = FSUtils.getFileId(filename);
+ final String instantTime = FSUtils.getCommitTime(filename);
+ HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(),
dataFilePath);
+ Iterator<String> recordKeyIterator = reader.getRecordKeyIterator();
+
+ return new Iterator<HoodieRecord>() {
+ @Override
+ public boolean hasNext() {
+ return recordKeyIterator.hasNext();
+ }
+
+ @Override
+ public HoodieRecord next() {
+ return
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(),
partition, fileId,
Review Comment:
we do have a CloseableMappingIterator or MappingIterator that we can use
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean
isCommitRevertedByInFlightAction(
/**
* Initialize the Metadata Table by listing files and partitions from the
file system.
*
- * @param dataMetaClient - {@code HoodieTableMetaClient} for the
dataset.
+ * @param initializationTime - Timestamp to use for the commit
+ * @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for
this initialization
*/
- private boolean initializeFromFilesystem(HoodieTableMetaClient
dataMetaClient,
+ private boolean initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit,
Option<String>
inflightInstantTimestamp) throws IOException {
if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
return false;
}
- String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
- initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
- initTableMetadata();
- // if async metadata indexing is enabled,
- // then only initialize files partition as other partitions will be built
using HoodieIndexer
- List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>();
- if (dataWriteConfig.isMetadataAsyncIndex()) {
- enabledPartitionTypes.add(MetadataPartitionType.FILES);
- } else {
- // all enabled ones should be initialized
- enabledPartitionTypes = this.enabledPartitionTypes;
+ // FILES partition is always initialized first
+
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+ || partitionsToInit.get(0).equals(MetadataPartitionType.FILES),
"FILES partition should be initialized first: " + partitionsToInit);
+
+ metadataMetaClient = initializeMetaClient();
+
+ // Get a complete list of files and partitions from the file system or
from already initialized FILES partition of MDT
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+ List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ?
listAllPartitionsFromMDT(initializationTime) :
listAllPartitionsFromFilesystem(initializationTime);
+ Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
+ .map(p -> {
+ String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+ return Pair.of(partitionName, p.getFileNameToSizeMap());
+ })
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ for (MetadataPartitionType partitionType : partitionsToInit) {
+ // Find the commit timestamp to use for this partition. Each
initialization should use its own unique commit time.
+ String commitTimeForPartition =
generateUniqueCommitInstantTime(initializationTime);
+
+ LOG.info("Initializing MDT partition " + partitionType + " at instant "
+ commitTimeForPartition);
+
+ Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+ switch (partitionType) {
+ case FILES:
+ fileGroupCountAndRecordsPair =
initializeFilesPartition(initializationTime, partitionInfoList);
+ break;
+ case BLOOM_FILTERS:
+ fileGroupCountAndRecordsPair =
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+ break;
+ case COLUMN_STATS:
+ fileGroupCountAndRecordsPair =
initializeColumnStatsPartition(partitionToFilesMap);
+ break;
+ case RECORD_INDEX:
+ fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+ break;
+ default:
+ throw new HoodieMetadataException("Unsupported MDT partition type: "
+ partitionType);
+ }
+
+ // Generate the file groups
+ final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+ ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for
MDT partition " + partitionType + " should be > 0");
+ initializeFileGroups(dataMetaClient, partitionType,
commitTimeForPartition, fileGroupCount);
+
+ // Perform the commit using bulkCommit
+ HoodieData<HoodieRecord> records =
fileGroupCountAndRecordsPair.getValue();
+ bulkCommit(commitTimeForPartition, partitionType, records,
fileGroupCount);
+ metadataMetaClient.reloadActiveTimeline();
+
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
partitionType, true);
+ initMetadataReader();
}
- initializeEnabledFileGroups(dataMetaClient, createInstantTime,
enabledPartitionTypes);
- initialCommit(createInstantTime, enabledPartitionTypes);
- updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
return true;
}
- private String getInitialCommitInstantTime(HoodieTableMetaClient
dataMetaClient) {
- // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP
as the instant time for initial commit
- // Otherwise, we use the timestamp of the latest completed action.
- String createInstantTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
- LOG.info("Creating a new metadata table in " +
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
- return createInstantTime;
+ /**
+ * Returns a unique timestamp to use for initializing a MDT partition.
+ * <p>
+ * Since commits are immutable, we should use unique timestamps to
initialize each partition. For this, we will add a suffix to the given
initializationTime
+ * until we find a unique timestamp.
+ *
+ * @param initializationTime Timestamp from dataset to use for initialization
+ * @return a unique timestamp for MDT
+ */
+ private String generateUniqueCommitInstantTime(String initializationTime) {
+ // Add suffix to initializationTime to find an unused instant time for the
next index initialization.
+ // This function would be called multiple times in a single application if
multiple indexes are being
+ // initialized one after the other.
+ for (int offset = 0; ; ++offset) {
+ final String commitInstantTime =
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+ if
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+ return commitInstantTime;
+ }
+ }
}
- private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient,
Option<String> inflightInstantTimestamp) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be initialized
as it is not enabled");
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeColumnStatsPartition(Map<String, Map<String, Long>>
partitionToFilesMap) {
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams());
+
+ final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+ return Pair.of(fileGroupCount, records);
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeBloomFiltersPartition(String createInstantTime, Map<String,
Map<String, Long>> partitionToFilesMap) {
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams(), createInstantTime);
+
+ final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+ return Pair.of(fileGroupCount, records);
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition() throws IOException {
+ // Open the MDT reader to create a file system view
+ initMetadataReader();
+ final HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
+ dataMetaClient.getActiveTimeline(), metadata);
+
+ // Collect the list of latest base files present in each partition
+ List<String> partitions = metadata.getAllPartitionPaths();
+ final List<Pair<String, String>> partitionBaseFilePairs = new
ArrayList<>();
+ for (String partition : partitions) {
+ partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+ .map(basefile -> Pair.of(partition,
basefile.getFileName())).collect(Collectors.toList()));
+ }
+ LOG.info("Initializing record index from " + partitionBaseFilePairs.size()
+ " base files in "
+ + partitions.size() + " partitions");
+
+ // Collect record keys from the files in parallel
+ HoodieData<HoodieRecord> records =
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+ records.persist("MEMORY_AND_DISK_SER");
+ final long recordCount = records.count();
+
+ // Initialize the file groups
+ final int fileGroupCount =
HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX,
recordCount,
+ RECORD_INDEX_AVERAGE_RECORD_SIZE,
dataWriteConfig.getRecordIndexMinFileGroupCount(),
+ dataWriteConfig.getRecordIndexMaxFileGroupCount(),
dataWriteConfig.getRecordIndexGrowthFactor(),
+ dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
+
+ LOG.info(String.format("Initializing record index with %d mappings and %d
file groups.", recordCount, fileGroupCount));
+ return Pair.of(fileGroupCount, records);
+ }
+
+ /**
+ * Read the record keys from base files in partitions and return records.
+ */
+ private HoodieData<HoodieRecord>
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
+ List<Pair<String, String>> partitionBaseFilePairs) {
+ if (partitionBaseFilePairs.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index:
reading record keys from base files");
+ return engineContext.parallelize(partitionBaseFilePairs,
partitionBaseFilePairs.size()).flatMap(p -> {
+ final String partition = p.getKey();
+ final String filename = p.getValue();
+ Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition +
Path.SEPARATOR + filename);
+
+ final String fileId = FSUtils.getFileId(filename);
+ final String instantTime = FSUtils.getCommitTime(filename);
+ HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(),
dataFilePath);
+ Iterator<String> recordKeyIterator = reader.getRecordKeyIterator();
+
+ return new Iterator<HoodieRecord>() {
+ @Override
+ public boolean hasNext() {
+ return recordKeyIterator.hasNext();
+ }
+
+ @Override
+ public HoodieRecord next() {
+ return
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(),
partition, fileId,
+ instantTime);
+ }
+ };
+ });
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeFilesPartition(String createInstantTime, List<DirectoryInfo>
partitionInfoList) {
+ // FILES partition uses a single file group
+ final int fileGroupCount = 1;
+
+ List<String> partitions = partitionInfoList.stream().map(p ->
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()))
+ .collect(Collectors.toList());
+ final int totalDataFilesCount =
partitionInfoList.stream().mapToInt(DirectoryInfo::getTotalFiles).sum();
+ LOG.info("Committing " + partitions.size() + " partitions and " +
totalDataFilesCount + " files to metadata"); //pwason reword
Review Comment:
fix the docs "pwason reword" ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -883,44 +977,58 @@ public void close() throws Exception {
/**
* Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
*
- * @param instantTime - Action instant time for this commit
- * @param partitionRecordsMap - Map of partition name to its records to
commit
- * @param canTriggerTableService true if table services can be scheduled and
executed. false otherwise.
+ * @param instantTime - Action instant time for this commit
+ * @param partitionRecordsMap - Map of partition type to its records to
commit
*/
- protected abstract void commit(
- String instantTime, Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap,
- boolean canTriggerTableService);
+ protected abstract void commit(String instantTime,
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap);
+
+ /**
+ * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit
using bulk commit (if supported).
+ * <p>
+ * This is used to optimize the initial commit to the MDT partition which
may contains a large number of
+ * records and hence is more suited to bulkInsert for write performance.
+ *
+ * @param instantTime - Action instant time for this commit
+ * @param partitionType - The MDT partition to which records are to be
committed
+ * @param records - records to be bulk inserted
+ * @param fileGroupCount - The maximum number of file groups to which the
records will be written.
+ */
+ protected void bulkCommit(
+ String instantTime, MetadataPartitionType partitionType,
HoodieData<HoodieRecord> records,
+ int fileGroupCount) {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap =
new HashMap<>();
+ partitionRecordsMap.put(partitionType, records);
+ commit(instantTime, partitionRecordsMap);
Review Comment:
Can we do Collections.singletonMap() ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -794,25 +882,31 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
}
partitionTypes.add(partitionType);
});
- // before initial commit update inflight indexes in table config
- Set<String> inflightIndexes =
getInflightMetadataPartitions(dataMetaClient.getTableConfig());
-
inflightIndexes.addAll(indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
-
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(),
String.join(",", inflightIndexes));
- HoodieTableConfig.update(dataMetaClient.getFs(), new
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
- initialCommit(indexUptoInstantTime + METADATA_INDEXER_TIME_SUFFIX,
partitionTypes);
+
+ // before initialization set these partitions as inflight in table config
+
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient,
partitionTypes);
+
+ // initialize partitions
+ initializeFromFilesystem(indexUptoInstantTime +
METADATA_INDEXER_TIME_SUFFIX, partitionTypes, Option.empty());
}
/**
* Update from {@code HoodieCommitMetadata}.
*
* @param commitMetadata {@code HoodieCommitMetadata}
- * @param instantTime Timestamp at which the commit was performed
- * @param isTableServiceAction {@code true} if commit metadata is pertaining
to a table service. {@code false} otherwise.
+ * @param instantTime Timestamp at which the commit was performed
*/
@Override
- public void update(HoodieCommitMetadata commitMetadata, String instantTime,
boolean isTableServiceAction) {
- processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(
- engineContext, commitMetadata, instantTime,
getRecordsGenerationParams()), !isTableServiceAction);
+ public void update(HoodieCommitMetadata commitMetadata,
HoodieData<WriteStatus> writeStatus, String instantTime) {
+ processAndCommit(instantTime, () -> {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap =
+ HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
commitMetadata, instantTime, getRecordsGenerationParams());
+
+ // Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
Review Comment:
why can't we move WriteStatus to hudi-common.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1000,87 +1136,78 @@ protected void cleanIfNecessary(BaseHoodieWriteClient
writeClient, String instan
// Trigger cleaning with suffixes based on the same instant time. This
ensures that any future
// delta commits synced over will not have an instant time lesser than the
last completed instant on the
// metadata table.
- writeClient.clean(instantTime + "002");
+
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
writeClient.lazyRollbackFailedIndexing();
}
/**
- * This is invoked to initialize metadata table for a dataset.
- * Initial commit has special handling mechanism due to its scale compared
to other regular commits.
- * During cold startup, the list of files to be committed can be huge.
- * So creating a HoodieCommitMetadata out of these large number of files,
- * and calling the existing update(HoodieCommitMetadata) function does not
scale well.
- * Hence, we have a special commit just for the initialization scenario.
+ * Validates the timeline for both main and metadata tables to ensure
compaction on MDT can be scheduled.
*/
- private void initialCommit(String createInstantTime,
List<MetadataPartitionType> partitionTypes) {
- // List all partitions in the basePath of the containing dataset
- LOG.info("Initializing metadata table by using file listings in " +
dataWriteConfig.getBasePath());
- engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing
metadata table by listing files and partitions: " +
dataWriteConfig.getTableName());
-
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap
= new HashMap<>();
-
- // skip file system listing to populate metadata records if it's a fresh
table.
- // this is applicable only if the table already has N commits and metadata
is enabled at a later point in time.
- if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { //
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
- // If not, last completed commit in data table will be chosen as the
initial commit time.
- LOG.info("Triggering empty Commit to metadata to initialize");
- } else {
- List<DirectoryInfo> partitionInfoList =
listAllPartitions(dataMetaClient);
- Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
- .map(p -> {
- String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
- return Pair.of(partitionName, p.getFileNameToSizeMap());
- })
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
- int totalDataFilesCount =
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
- List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
- if (partitionTypes.contains(MetadataPartitionType.FILES)) {
- // Record which saves the list of all partitions
- HoodieRecord allPartitionRecord =
HoodieMetadataPayload.createPartitionListRecord(partitions);
- HoodieData<HoodieRecord> filesPartitionRecords =
getFilesPartitionRecords(createInstantTime, partitionInfoList,
allPartitionRecord);
- ValidationUtils.checkState(filesPartitionRecords.count() ==
(partitions.size() + 1));
- partitionToRecordsMap.put(MetadataPartitionType.FILES,
filesPartitionRecords);
- }
-
- if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) &&
totalDataFilesCount > 0) {
- final HoodieData<HoodieRecord> recordsRDD =
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
- engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams(), createInstantTime);
- partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS,
recordsRDD);
- }
+ private boolean validateTimelineBeforeSchedulingCompaction(Option<String>
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+ // we need to find if there are any inflights in data table timeline
before or equal to the latest delta commit in metadata table.
+ // Whenever you want to change this logic, please ensure all below
scenarios are considered.
+ // a. There could be a chance that latest delta commit in MDT is committed
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
+ // b. There could be DT inflights after latest delta commit in MDT and we
are ok with it. bcoz, the contract is, latest compaction instant time in MDT
represents
+ // any instants before that is already synced with metadata table.
+ // c. Do consider out of order commits. For eg, c4 from DT could complete
before c3. and we can't trigger compaction in MDT with c4 as base instant time,
until every
+ // instant before c4 is synced with metadata table.
+ List<HoodieInstant> pendingInstants =
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
- if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) &&
totalDataFilesCount > 0) {
- final HoodieData<HoodieRecord> recordsRDD =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
- engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams());
- partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS,
recordsRDD);
- }
- LOG.info("Committing " + partitions.size() + " partitions and " +
totalDataFilesCount + " files to metadata");
+ if (!pendingInstants.isEmpty()) {
+ checkNumDeltaCommits(metadataMetaClient,
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
+ LOG.info(String.format(
+ "Cannot compact metadata table as there are %d inflight instants in
data table before latest deltacommit in metadata table: %s. Inflight instants
in data table: %s",
+ pendingInstants.size(), latestDeltaCommitTimeInMetadataTable,
Arrays.toString(pendingInstants.toArray())));
+ return false;
}
- commit(createInstantTime, partitionToRecordsMap, false);
+ return true;
}
- private HoodieData<HoodieRecord> getFilesPartitionRecords(String
createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord
allPartitionRecord) {
- HoodieData<HoodieRecord> filesPartitionRecords =
engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
- if (partitionInfoList.isEmpty()) {
- return filesPartitionRecords;
- }
+ /**
+ * Return records that represent update to the record index due to write
operation on the dataset.
+ *
+ * @param writeStatuses (@code WriteStatus} from the write operation
+ */
+ private HoodieData<HoodieRecord>
getRecordIndexUpdates(HoodieData<WriteStatus> writeStatuses) {
+ return writeStatuses.flatMap(writeStatus -> {
+ List<HoodieRecord> recordList = new LinkedList<>();
+ for (HoodieRecord writtenRecord : writeStatus.getWrittenRecords()) {
+ if (!writeStatus.isErrored(writtenRecord.getKey())) {
+ HoodieRecord hoodieRecord;
+ HoodieKey key = writtenRecord.getKey();
+ Option<HoodieRecordLocation> newLocation =
writtenRecord.getNewLocation();
+ if (newLocation.isPresent()) {
+ if (writtenRecord.getCurrentLocation() != null) {
+ // This is an update, no need to update index if the location
has not changed
+ // newLocation should have the same fileID as currentLocation.
The instantTimes differ as newLocation's
+ // instantTime refers to the current commit which was completed.
+ if
(!writtenRecord.getCurrentLocation().getFileId().equals(newLocation.get().getFileId()))
{
+ final String msg = String.format("Detected update in location
of record with key %s from %s "
Review Comment:
we might need to fix this.
atleast for global index update partition path flow.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean
isCommitRevertedByInFlightAction(
/**
* Initialize the Metadata Table by listing files and partitions from the
file system.
*
- * @param dataMetaClient - {@code HoodieTableMetaClient} for the
dataset.
+ * @param initializationTime - Timestamp to use for the commit
+ * @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for
this initialization
*/
- private boolean initializeFromFilesystem(HoodieTableMetaClient
dataMetaClient,
+ private boolean initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit,
Option<String>
inflightInstantTimestamp) throws IOException {
if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
return false;
}
- String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
- initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
- initTableMetadata();
- // if async metadata indexing is enabled,
- // then only initialize files partition as other partitions will be built
using HoodieIndexer
- List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>();
- if (dataWriteConfig.isMetadataAsyncIndex()) {
- enabledPartitionTypes.add(MetadataPartitionType.FILES);
- } else {
- // all enabled ones should be initialized
- enabledPartitionTypes = this.enabledPartitionTypes;
+ // FILES partition is always initialized first
+
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+ || partitionsToInit.get(0).equals(MetadataPartitionType.FILES),
"FILES partition should be initialized first: " + partitionsToInit);
+
+ metadataMetaClient = initializeMetaClient();
+
+ // Get a complete list of files and partitions from the file system or
from already initialized FILES partition of MDT
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+ List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ?
listAllPartitionsFromMDT(initializationTime) :
listAllPartitionsFromFilesystem(initializationTime);
+ Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
+ .map(p -> {
+ String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+ return Pair.of(partitionName, p.getFileNameToSizeMap());
+ })
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ for (MetadataPartitionType partitionType : partitionsToInit) {
+ // Find the commit timestamp to use for this partition. Each
initialization should use its own unique commit time.
+ String commitTimeForPartition =
generateUniqueCommitInstantTime(initializationTime);
+
+ LOG.info("Initializing MDT partition " + partitionType + " at instant "
+ commitTimeForPartition);
+
+ Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+ switch (partitionType) {
+ case FILES:
+ fileGroupCountAndRecordsPair =
initializeFilesPartition(initializationTime, partitionInfoList);
+ break;
+ case BLOOM_FILTERS:
+ fileGroupCountAndRecordsPair =
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+ break;
+ case COLUMN_STATS:
+ fileGroupCountAndRecordsPair =
initializeColumnStatsPartition(partitionToFilesMap);
+ break;
+ case RECORD_INDEX:
+ fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+ break;
+ default:
+ throw new HoodieMetadataException("Unsupported MDT partition type: "
+ partitionType);
+ }
+
+ // Generate the file groups
+ final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+ ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for
MDT partition " + partitionType + " should be > 0");
+ initializeFileGroups(dataMetaClient, partitionType,
commitTimeForPartition, fileGroupCount);
+
+ // Perform the commit using bulkCommit
+ HoodieData<HoodieRecord> records =
fileGroupCountAndRecordsPair.getValue();
+ bulkCommit(commitTimeForPartition, partitionType, records,
fileGroupCount);
+ metadataMetaClient.reloadActiveTimeline();
+
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
partitionType, true);
+ initMetadataReader();
}
- initializeEnabledFileGroups(dataMetaClient, createInstantTime,
enabledPartitionTypes);
- initialCommit(createInstantTime, enabledPartitionTypes);
- updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
return true;
}
- private String getInitialCommitInstantTime(HoodieTableMetaClient
dataMetaClient) {
- // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP
as the instant time for initial commit
- // Otherwise, we use the timestamp of the latest completed action.
- String createInstantTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
- LOG.info("Creating a new metadata table in " +
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
- return createInstantTime;
+ /**
+ * Returns a unique timestamp to use for initializing a MDT partition.
+ * <p>
+ * Since commits are immutable, we should use unique timestamps to
initialize each partition. For this, we will add a suffix to the given
initializationTime
+ * until we find a unique timestamp.
+ *
+ * @param initializationTime Timestamp from dataset to use for initialization
+ * @return a unique timestamp for MDT
+ */
+ private String generateUniqueCommitInstantTime(String initializationTime) {
+ // Add suffix to initializationTime to find an unused instant time for the
next index initialization.
+ // This function would be called multiple times in a single application if
multiple indexes are being
+ // initialized one after the other.
+ for (int offset = 0; ; ++offset) {
+ final String commitInstantTime =
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+ if
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+ return commitInstantTime;
+ }
+ }
}
- private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient,
Option<String> inflightInstantTimestamp) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be initialized
as it is not enabled");
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeColumnStatsPartition(Map<String, Map<String, Long>>
partitionToFilesMap) {
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams());
+
+ final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+ return Pair.of(fileGroupCount, records);
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeBloomFiltersPartition(String createInstantTime, Map<String,
Map<String, Long>> partitionToFilesMap) {
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams(), createInstantTime);
+
+ final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+ return Pair.of(fileGroupCount, records);
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition() throws IOException {
+ // Open the MDT reader to create a file system view
+ initMetadataReader();
Review Comment:
+1
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -111,18 +111,27 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
public static final String METADATA_COMPACTION_TIME_SUFFIX = "001";
+ // Virtual keys support for metadata table. This Field is
+ // from the metadata payload schema.
+ private static final String RECORD_KEY_FIELD_NAME =
HoodieMetadataPayload.KEY_FIELD_NAME;
+
+ // Average size of a record saved within the record index.
+ // Record index has a fixed size schema. This has been calculated based on
experiments with default settings
+ // for block size (4MB), compression (GZ) and disabling the hudi metadata
fields.
Review Comment:
or should we set it ot 4Mb for RLI partition alone. and leave it to 1Mb for
other partitions in MDT ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean
isCommitRevertedByInFlightAction(
/**
* Initialize the Metadata Table by listing files and partitions from the
file system.
*
- * @param dataMetaClient - {@code HoodieTableMetaClient} for the
dataset.
+ * @param initializationTime - Timestamp to use for the commit
+ * @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for
this initialization
*/
- private boolean initializeFromFilesystem(HoodieTableMetaClient
dataMetaClient,
+ private boolean initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit,
Option<String>
inflightInstantTimestamp) throws IOException {
if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
return false;
}
- String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
- initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
- initTableMetadata();
- // if async metadata indexing is enabled,
- // then only initialize files partition as other partitions will be built
using HoodieIndexer
- List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>();
- if (dataWriteConfig.isMetadataAsyncIndex()) {
- enabledPartitionTypes.add(MetadataPartitionType.FILES);
- } else {
- // all enabled ones should be initialized
- enabledPartitionTypes = this.enabledPartitionTypes;
+ // FILES partition is always initialized first
+
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+ || partitionsToInit.get(0).equals(MetadataPartitionType.FILES),
"FILES partition should be initialized first: " + partitionsToInit);
+
+ metadataMetaClient = initializeMetaClient();
+
+ // Get a complete list of files and partitions from the file system or
from already initialized FILES partition of MDT
+ boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+ List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ?
listAllPartitionsFromMDT(initializationTime) :
listAllPartitionsFromFilesystem(initializationTime);
+ Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
+ .map(p -> {
+ String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+ return Pair.of(partitionName, p.getFileNameToSizeMap());
+ })
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ for (MetadataPartitionType partitionType : partitionsToInit) {
+ // Find the commit timestamp to use for this partition. Each
initialization should use its own unique commit time.
+ String commitTimeForPartition =
generateUniqueCommitInstantTime(initializationTime);
+
+ LOG.info("Initializing MDT partition " + partitionType + " at instant "
+ commitTimeForPartition);
+
+ Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+ switch (partitionType) {
+ case FILES:
+ fileGroupCountAndRecordsPair =
initializeFilesPartition(initializationTime, partitionInfoList);
+ break;
+ case BLOOM_FILTERS:
+ fileGroupCountAndRecordsPair =
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+ break;
+ case COLUMN_STATS:
+ fileGroupCountAndRecordsPair =
initializeColumnStatsPartition(partitionToFilesMap);
+ break;
+ case RECORD_INDEX:
+ fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+ break;
+ default:
+ throw new HoodieMetadataException("Unsupported MDT partition type: "
+ partitionType);
+ }
+
+ // Generate the file groups
+ final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+ ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for
MDT partition " + partitionType + " should be > 0");
+ initializeFileGroups(dataMetaClient, partitionType,
commitTimeForPartition, fileGroupCount);
+
+ // Perform the commit using bulkCommit
+ HoodieData<HoodieRecord> records =
fileGroupCountAndRecordsPair.getValue();
+ bulkCommit(commitTimeForPartition, partitionType, records,
fileGroupCount);
+ metadataMetaClient.reloadActiveTimeline();
+
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
partitionType, true);
+ initMetadataReader();
}
- initializeEnabledFileGroups(dataMetaClient, createInstantTime,
enabledPartitionTypes);
- initialCommit(createInstantTime, enabledPartitionTypes);
- updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
return true;
}
- private String getInitialCommitInstantTime(HoodieTableMetaClient
dataMetaClient) {
- // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP
as the instant time for initial commit
- // Otherwise, we use the timestamp of the latest completed action.
- String createInstantTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
- LOG.info("Creating a new metadata table in " +
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
- return createInstantTime;
+ /**
+ * Returns a unique timestamp to use for initializing a MDT partition.
+ * <p>
+ * Since commits are immutable, we should use unique timestamps to
initialize each partition. For this, we will add a suffix to the given
initializationTime
+ * until we find a unique timestamp.
+ *
+ * @param initializationTime Timestamp from dataset to use for initialization
+ * @return a unique timestamp for MDT
+ */
+ private String generateUniqueCommitInstantTime(String initializationTime) {
+ // Add suffix to initializationTime to find an unused instant time for the
next index initialization.
+ // This function would be called multiple times in a single application if
multiple indexes are being
+ // initialized one after the other.
+ for (int offset = 0; ; ++offset) {
+ final String commitInstantTime =
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+ if
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+ return commitInstantTime;
+ }
+ }
}
- private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient,
Option<String> inflightInstantTimestamp) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be initialized
as it is not enabled");
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeColumnStatsPartition(Map<String, Map<String, Long>>
partitionToFilesMap) {
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams());
+
+ final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+ return Pair.of(fileGroupCount, records);
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeBloomFiltersPartition(String createInstantTime, Map<String,
Map<String, Long>> partitionToFilesMap) {
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams(), createInstantTime);
+
+ final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+ return Pair.of(fileGroupCount, records);
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition() throws IOException {
+ // Open the MDT reader to create a file system view
+ initMetadataReader();
+ final HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
+ dataMetaClient.getActiveTimeline(), metadata);
+
+ // Collect the list of latest base files present in each partition
+ List<String> partitions = metadata.getAllPartitionPaths();
+ final List<Pair<String, String>> partitionBaseFilePairs = new
ArrayList<>();
+ for (String partition : partitions) {
+ partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
Review Comment:
we can file a ticket and tackle it in a follow up patch. Lets not increase
the scope of this patch. Lets keep it confined to base RLI support.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -933,52 +1041,80 @@ protected HoodieData<HoodieRecord>
prepRecords(Map<MetadataPartitionType,
}
/**
- * Perform a compaction on the Metadata Table.
- *
- * Cases to be handled:
- * 1. We cannot perform compaction if there are previous inflight
operations on the dataset. This is because
- * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
- *
- * 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
- * deltacommit.
+ * Optimize the metadata table by running compaction, clean and archive as
required.
+ * <p>
+ * Don't perform optimization if there are inflight operations on the
dataset. This is for two reasons:
Review Comment:
yes. its the same as before. no major change. in this logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]