manojpec commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r799116647
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -621,8 +635,14 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
LOG.info(String.format("Creating %d file groups for partition %s with base
fileId %s at instant time %s",
fileGroupCount, metadataPartition.getPartitionPath(),
metadataPartition.getFileIdPrefix(), instantTime));
+ HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemView(metadataMetaClient);
+ List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.ofNullable(fsView), metadataPartition.getPartitionPath());
for (int i = 0; i < fileGroupCount; ++i) {
final String fileGroupFileId = String.format("%s%04d",
metadataPartition.getFileIdPrefix(), i);
+ // if a writer or async indexer had already initialized the filegroup
then continue
+ if (!fileSlices.isEmpty() && fileSlices.stream().anyMatch(fileSlice ->
fileGroupFileId.equals(fileSlice.getFileGroupId().getFileId()))) {
Review comment:
We should not get into this case. Can you please explain how this can
happen? Its either all filegroups are initialized or nothing is initialized.
This block gives a sense that it can be partially initialized and the
incomplete filgroups are inited here. Which shouldn't happen right?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -383,6 +391,12 @@ public void initTableMetadata() {
}
if (!exists) {
+ if (metadataWriteConfig.isMetadataAsyncIndex()) {
+ // with async metadata indexing enabled, there can be inflight writers
+ // TODO: schedule indexing only for enabled partition types
Review comment:
There will be more merge conflicts with
https://github.com/apache/hudi/pull/4746. Better to rebase sooner than later.
Also, that PR takes care of init for all enabled partitions
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
- commit(engineContext.parallelize(records, 1),
MetadataPartitionType.FILES.partitionPath(), instantTime,
canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, instant: %s", p, instantTime));
+ }
+ List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
+ commit(engineContext.parallelize(records, 1), p, instantTime,
canTriggerTableService);
+ }
+ });
+ }
+
+ private List<String> getMetadataPartitionsToUpdate() {
+ // find last (pending or) completed index instant and get partitions (to
be) written
+ Option<HoodieInstant> lastIndexingInstant =
dataMetaClient.getActiveTimeline()
+
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+ if (lastIndexingInstant.isPresent()) {
+ try {
+ // TODO: handle inflight instant, if it is inflight then read from
requested file.
+ HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+ return
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+ } catch (IOException e) {
+ LOG.warn("Could not read index plan. Falling back to
FileSystem.exists() check.");
+ return getExistingMetadataPartitions();
+ }
}
+ // TODO: return only enabled partitions
+ return MetadataPartitionType.all();
+ }
+
+ private List<String> getExistingMetadataPartitions() {
+ return MetadataPartitionType.all().stream()
+ .filter(p -> {
+ try {
+ // TODO: avoid fs.exists() check
+ return
metadataMetaClient.getFs().exists(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
p));
+ } catch (IOException e) {
+ return false;
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void index(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+ indexPartitionInfos.forEach(indexPartitionInfo -> {
+ String indexUptoInstantTime = indexPartitionInfo.getIndexUptoInstant();
+ String relativePartitionPath =
indexPartitionInfo.getMetadataPartitionPath();
+ LOG.info(String.format("Creating a new metadata index for partition '%s'
under path %s upto instant %s",
+ relativePartitionPath, metadataWriteConfig.getBasePath(),
indexUptoInstantTime));
+ try {
+ HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(HoodieTableType.MERGE_ON_READ)
+ .setTableName(tableName)
+ .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+ .setPayloadClassName(HoodieMetadataPayload.class.getName())
+ .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
+ .setRecordKeyFields(RECORD_KEY_FIELD_NAME)
+
.setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields())
+
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+ .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
+ initTableMetadata();
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT)),
indexUptoInstantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, indexUptoInstant: %s",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+
+ // List all partitions in the basePath of the containing dataset
+ LOG.info("Initializing metadata table by using file listings in " +
dataWriteConfig.getBasePath());
+ engineContext.setJobStatus(this.getClass().getSimpleName(),
"MetadataIndex: initializing metadata table by listing files and partitions");
+ List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
+
+ // During bootstrap, the list of files to be committed can be huge. So
creating a HoodieCommitMetadata out of these
+ // large number of files and calling the existing
update(HoodieCommitMetadata) function does not scale well.
+ // Hence, we have a special commit just for the bootstrap scenario.
+ bootstrapCommit(dirInfoList, indexUptoInstantTime,
relativePartitionPath);
Review comment:
Please take a look at https://github.com/apache/hudi/pull/4746 for this
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -480,6 +482,25 @@ public abstract HoodieRollbackMetadata
rollback(HoodieEngineContext context,
boolean deleteInstants,
boolean skipLocking);
+ /**
+ * Schedules Indexing for the table to the given instant.
+ *
+ * @param context HoodieEngineContext
+ * @param indexInstantTime Instant time for scheduling index action.
+ * @param partitionsToIndex List of {@link
MetadataPartitionType#partitionPath()} that should be indexed.
+ * @return HoodieIndexPlan containing metadata partitions and instant upto
which they should be indexed.
+ */
+ public abstract Option<HoodieIndexPlan> scheduleIndex(HoodieEngineContext
context, String indexInstantTime, List<String> partitionsToIndex);
+
+ /**
+ * Execute requested index action.
+ *
+ * @param context HoodieEngineContext
+ * @param indexInstantTime Instant time for which index action was scheduled.
+ * @return HoodieIndexCommitMetadata containing write stats for each
metadata partition.
+ */
+ public abstract Option<HoodieIndexCommitMetadata> index(HoodieEngineContext
context, String indexInstantTime);
Review comment:
nit: Just to go well with scheduleIndex() and to avoid confusion, how
about runIndex() or something similar?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -652,20 +672,99 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, instant: %s", p, instantTime));
+ }
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
Review comment:
This is already a map of all enabled partitions to its records to be
committed. And, we are doing this in the outer loop for each enabled partition.
This will lead to duplicates. Instead convertMetadataFunction should be doing
the getMetadataPartitionToUpdate(), right?
Doing multiple commits for metadata table will bring in all new cases to
consider. We can avoid that to keep the current model of single commit with all
partition's HoodieData<>.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -118,18 +124,18 @@
/**
* Hudi backed table metadata writer.
*
- * @param hadoopConf - Hadoop configuration to use for the
metadata writer
- * @param writeConfig - Writer config
- * @param engineContext - Engine context
- * @param actionMetadata - Optional action metadata to help decide
bootstrap operations
- * @param <T> - Action metadata types extending Avro
generated SpecificRecordBase
+ * @param hadoopConf - Hadoop configuration to use for the metadata writer
+ * @param writeConfig - Writer config
+ * @param engineContext - Engine context
+ * @param actionMetadata - Optional action metadata to help decide bootstrap
operations
+ * @param <T> - Action metadata types extending Avro generated
SpecificRecordBase
* @param inflightInstantTimestamp - Timestamp of any instant in progress
*/
protected <T extends SpecificRecordBase>
HoodieBackedTableMetadataWriter(Configuration hadoopConf,
-
HoodieWriteConfig writeConfig,
-
HoodieEngineContext engineContext,
-
Option<T> actionMetadata,
-
Option<String> inflightInstantTimestamp) {
+ HoodieWriteConfig writeConfig,
Review comment:
nit: can we avoid changing formatting of these lines if there are no
changes to these lines? Is this format on save doing hudi style changes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]