rohan-uptycs commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1186017867
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -271,8 +327,114 @@ public Option<HoodieRecordLocation>
getRecordLocation(HoodieKey key) {
}
LOG.error("Consistent hashing node has no file group, partition: " +
partitionPath + ", meta: "
- +
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ",
record_key: " + key.toString());
+ +
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ",
record_key: " + key.toString());
throw new HoodieIndexException("Failed to getBucket as hashing node has
no file group");
}
}
+
+ /***
+ * Create commit marker -> hoodieinstant.commit in metadata folder,
consistent hashing metadata reader will use it to
+ * identify latest commited file which will have updated commit metadata
+ * @param table
+ * @param hoodieInstant
+ */
+ public void commitIndexMetadataIfNeeded(HoodieTable table, String
hoodieInstant) {
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair =
+ ClusteringUtils.getClusteringPlan(table.getMetaClient(),
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+ if (!instantPlanPair.isPresent()) {
+ return;
+ }
+ HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+ List<Map<String, String>> partitionMapList =
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+ partitionMapList.stream().forEach(partitionMap -> {
+ String partition =
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+ Path metadataPartitionPath =
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(),
partition);
+ Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant +
HASHING_METADATA_FILE_SUFFIX);
+ try {
+ if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+ createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("exception while committing hashing
metadata for path " + metadataFilePath, e);
+ }
+ });
+ }
+
+ /***
+ * Create commit marker corresponding to hashing metadata file after post
commit clustering operation
+ * @param table
+ * @param fileStatus
+ * @param partitionPath
+ * @throws IOException
+ */
+ private static void createCommitMarker(HoodieTable table, Path fileStatus,
Path partitionPath) throws IOException {
+ HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+ Path fullPath = new Path(partitionPath,
getTimestampFromFile(fileStatus.getName()) +
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+ if (fs.exists(fullPath)) {
+ return;
+ }
+ String metadata = "";
+ FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes()));
+ }
+
+ /***
+ * Load consistent hashing metadata from given file
+ * @param table
+ * @param metaFile
+ * @return
+ */
+ private static Option<HoodieConsistentHashingMetadata>
loadMetadataFromGivenFile(HoodieTable table, FileStatus metaFile) {
+ try {
+ if (metaFile == null) {
+ return Option.empty();
+ }
+ byte[] content =
FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath()));
+ return Option.of(HoodieConsistentHashingMetadata.fromBytes(content));
+ } catch (FileNotFoundException e) {
+ return Option.empty();
+ } catch (IOException e) {
+ LOG.error("Error when loading hashing metadata, for path: " +
metaFile.getPath().getName(), e);
+ throw new HoodieIndexException("Error while loading hashing metadata",
e);
+ }
+ }
+
+ /***
+ * COMMIT MARKER RECOVERY JOB.
+ * If particular hashing metadta file doesn't have commit marker then there
could be a case where clustering is done but post commit marker
+ * creation operation failed. In this case this method will check file group
id from consistent hashing metadata against storage base file group ids.
+ * if one of the file group matches then we can conclude that this is the
latest metadata file.
+ * Note : we will end up calling this method if there is no marker file and
no replace commit on active timeline, if replace commit is not present on
+ * active timeline that means old file group id's before clustering
operation got cleaned and only new file group id's of current clustering
operation
+ * are present on the disk.
+ * @param table
Review Comment:
done
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -271,8 +327,114 @@ public Option<HoodieRecordLocation>
getRecordLocation(HoodieKey key) {
}
LOG.error("Consistent hashing node has no file group, partition: " +
partitionPath + ", meta: "
- +
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ",
record_key: " + key.toString());
+ +
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ",
record_key: " + key.toString());
throw new HoodieIndexException("Failed to getBucket as hashing node has
no file group");
}
}
+
+ /***
+ * Create commit marker -> hoodieinstant.commit in metadata folder,
consistent hashing metadata reader will use it to
+ * identify latest commited file which will have updated commit metadata
+ * @param table
+ * @param hoodieInstant
+ */
+ public void commitIndexMetadataIfNeeded(HoodieTable table, String
hoodieInstant) {
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair =
+ ClusteringUtils.getClusteringPlan(table.getMetaClient(),
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+ if (!instantPlanPair.isPresent()) {
+ return;
+ }
+ HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+ List<Map<String, String>> partitionMapList =
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+ partitionMapList.stream().forEach(partitionMap -> {
+ String partition =
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+ Path metadataPartitionPath =
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(),
partition);
+ Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant +
HASHING_METADATA_FILE_SUFFIX);
+ try {
+ if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+ createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("exception while committing hashing
metadata for path " + metadataFilePath, e);
+ }
+ });
+ }
+
+ /***
+ * Create commit marker corresponding to hashing metadata file after post
commit clustering operation
+ * @param table
+ * @param fileStatus
+ * @param partitionPath
+ * @throws IOException
+ */
+ private static void createCommitMarker(HoodieTable table, Path fileStatus,
Path partitionPath) throws IOException {
+ HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+ Path fullPath = new Path(partitionPath,
getTimestampFromFile(fileStatus.getName()) +
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+ if (fs.exists(fullPath)) {
+ return;
+ }
+ String metadata = "";
+ FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes()));
+ }
+
+ /***
+ * Load consistent hashing metadata from given file
+ * @param table
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]