SteNicholas commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1185996328
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -271,8 +327,114 @@ public Option<HoodieRecordLocation>
getRecordLocation(HoodieKey key) {
}
LOG.error("Consistent hashing node has no file group, partition: " +
partitionPath + ", meta: "
- +
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ",
record_key: " + key.toString());
+ +
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ",
record_key: " + key.toString());
throw new HoodieIndexException("Failed to getBucket as hashing node has
no file group");
}
}
+
+ /***
+ * Create commit marker -> hoodieinstant.commit in metadata folder,
consistent hashing metadata reader will use it to
+ * identify latest commited file which will have updated commit metadata
+ * @param table
+ * @param hoodieInstant
+ */
+ public void commitIndexMetadataIfNeeded(HoodieTable table, String
hoodieInstant) {
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair =
+ ClusteringUtils.getClusteringPlan(table.getMetaClient(),
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+ if (!instantPlanPair.isPresent()) {
+ return;
+ }
+ HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+ List<Map<String, String>> partitionMapList =
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+ partitionMapList.stream().forEach(partitionMap -> {
+ String partition =
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+ Path metadataPartitionPath =
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(),
partition);
+ Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant +
HASHING_METADATA_FILE_SUFFIX);
+ try {
+ if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+ createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("exception while committing hashing
metadata for path " + metadataFilePath, e);
+ }
+ });
+ }
+
+ /***
+ * Create commit marker corresponding to hashing metadata file after post
commit clustering operation
+ * @param table
+ * @param fileStatus
+ * @param partitionPath
+ * @throws IOException
+ */
+ private static void createCommitMarker(HoodieTable table, Path fileStatus,
Path partitionPath) throws IOException {
+ HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+ Path fullPath = new Path(partitionPath,
getTimestampFromFile(fileStatus.getName()) +
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+ if (fs.exists(fullPath)) {
+ return;
+ }
+ String metadata = "";
+ FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes()));
+ }
+
+ /***
+ * Load consistent hashing metadata from given file
Review Comment:
```suggestion
* Loads consistent hashing metadata of table from the given meta file.
```
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -271,8 +327,114 @@ public Option<HoodieRecordLocation>
getRecordLocation(HoodieKey key) {
}
LOG.error("Consistent hashing node has no file group, partition: " +
partitionPath + ", meta: "
- +
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ",
record_key: " + key.toString());
+ +
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ",
record_key: " + key.toString());
throw new HoodieIndexException("Failed to getBucket as hashing node has
no file group");
}
}
+
+ /***
+ * Create commit marker -> hoodieinstant.commit in metadata folder,
consistent hashing metadata reader will use it to
+ * identify latest commited file which will have updated commit metadata
+ * @param table
+ * @param hoodieInstant
+ */
+ public void commitIndexMetadataIfNeeded(HoodieTable table, String
hoodieInstant) {
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair =
+ ClusteringUtils.getClusteringPlan(table.getMetaClient(),
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+ if (!instantPlanPair.isPresent()) {
+ return;
+ }
+ HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+ List<Map<String, String>> partitionMapList =
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+ partitionMapList.stream().forEach(partitionMap -> {
+ String partition =
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+ Path metadataPartitionPath =
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(),
partition);
+ Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant +
HASHING_METADATA_FILE_SUFFIX);
+ try {
+ if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+ createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("exception while committing hashing
metadata for path " + metadataFilePath, e);
+ }
+ });
+ }
+
+ /***
+ * Create commit marker corresponding to hashing metadata file after post
commit clustering operation
+ * @param table
+ * @param fileStatus
+ * @param partitionPath
+ * @throws IOException
+ */
+ private static void createCommitMarker(HoodieTable table, Path fileStatus,
Path partitionPath) throws IOException {
+ HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+ Path fullPath = new Path(partitionPath,
getTimestampFromFile(fileStatus.getName()) +
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+ if (fs.exists(fullPath)) {
+ return;
+ }
+ String metadata = "";
+ FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes()));
+ }
+
+ /***
+ * Load consistent hashing metadata from given file
+ * @param table
Review Comment:
Adds the comment of all parameters.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]