manojpec commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r797380164
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
- commit(engineContext.parallelize(records, 1),
MetadataPartitionType.FILES.partitionPath(), instantTime,
canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, instant: %s", p, instantTime));
+ }
+ List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
Review comment:
This might work for FILES partition. But, bloom filters and columns
stats partition initialization can be huge set of records and can blow up the
driver memory. I hit this problem as well. We need to make use of the context
to build this HoodieData<HoodieRecord> list and then commit the RDD via commit
call.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
- commit(engineContext.parallelize(records, 1),
MetadataPartitionType.FILES.partitionPath(), instantTime,
canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, instant: %s", p, instantTime));
+ }
+ List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
Review comment:
And, calling commit serially for each partition one after the other
might not be ideal. They are all different paritions under the same table and
we should be able to commit across all in one shot via HoodieData.
https://github.com/apache/hudi/pull/4352 take care of this. We can discuss more.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
- commit(engineContext.parallelize(records, 1),
MetadataPartitionType.FILES.partitionPath(), instantTime,
canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, instant: %s", p, instantTime));
+ }
+ List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
+ commit(engineContext.parallelize(records, 1), p, instantTime,
canTriggerTableService);
+ }
+ });
+ }
+
+ private List<String> getMetadataPartitionsToUpdate() {
+ // find last (pending or) completed index instant and get partitions (to
be) written
+ Option<HoodieInstant> lastIndexingInstant =
dataMetaClient.getActiveTimeline()
+
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+ if (lastIndexingInstant.isPresent()) {
+ try {
+ // TODO: handle inflight instant, if it is inflight then read from
requested file.
+ HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+ return
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+ } catch (IOException e) {
+ LOG.warn("Could not read index plan. Falling back to
FileSystem.exists() check.");
+ return getExistingMetadataPartitions();
Review comment:
But, FS check is not fool proof. Parition directory might exists well
before the full initialization right? Or, we take care of the partition dir
renamed/exists only after successful initialization?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -855,6 +855,17 @@ public boolean scheduleCompactionAtInstant(String
instantTime, Option<Map<String
return scheduleTableService(instantTime, extraMetadata,
TableServiceType.COMPACT).isPresent();
}
+ public Option<String> scheduleIndexing(List<String> partitions) {
Review comment:
Should the arg be from the metadata table partition enum types? To force
only valid/supported ones can be indexed?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -460,6 +475,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext
engineContext, Hoodi
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
initTableMetadata();
+ // TODO: make it generic for all enabled partition types
Review comment:
This is taken care in https://github.com/apache/hudi/pull/4352
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
- commit(engineContext.parallelize(records, 1),
MetadataPartitionType.FILES.partitionPath(), instantTime,
canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, instant: %s", p, instantTime));
+ }
+ List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
+ commit(engineContext.parallelize(records, 1), p, instantTime,
canTriggerTableService);
+ }
+ });
+ }
+
+ private List<String> getMetadataPartitionsToUpdate() {
+ // find last (pending or) completed index instant and get partitions (to
be) written
+ Option<HoodieInstant> lastIndexingInstant =
dataMetaClient.getActiveTimeline()
+
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+ if (lastIndexingInstant.isPresent()) {
+ try {
+ // TODO: handle inflight instant, if it is inflight then read from
requested file.
+ HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+ return
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+ } catch (IOException e) {
+ LOG.warn("Could not read index plan. Falling back to
FileSystem.exists() check.");
+ return getExistingMetadataPartitions();
+ }
}
+ // TODO: return only enabled partitions
+ return MetadataPartitionType.all();
+ }
+
+ private List<String> getExistingMetadataPartitions() {
+ return MetadataPartitionType.all().stream()
+ .filter(p -> {
+ try {
+ // TODO: avoid fs.exists() check
+ return
metadataMetaClient.getFs().exists(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
p));
Review comment:
See comment above. We can return the partitions only after it is fully
initialized.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]