rohan-uptycs commented on code in PR #8503:
URL: https://github.com/apache/hudi/pull/8503#discussion_r1186017867


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -271,8 +327,114 @@ public Option<HoodieRecordLocation> 
getRecordLocation(HoodieKey key) {
       }
 
       LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
-          + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
+              + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
       throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
     }
   }
+
+  /***
+   * Create commit marker -> hoodieinstant.commit in metadata folder, 
consistent hashing metadata reader will use it to
+   * identify latest commited file which will have updated commit metadata
+   * @param table
+   * @param hoodieInstant
+   */
+  public void commitIndexMetadataIfNeeded(HoodieTable table, String 
hoodieInstant) {
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair =
+            ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+    if (!instantPlanPair.isPresent()) {
+      return;
+    }
+    HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+    List<Map<String, String>> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+    partitionMapList.stream().forEach(partitionMap -> {
+      String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+      Path metadataPartitionPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+      Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + 
HASHING_METADATA_FILE_SUFFIX);
+      try {
+        if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+          createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+        }
+      } catch (IOException e) {
+        throw new HoodieIOException("exception while committing hashing 
metadata for path " + metadataFilePath, e);
+      }
+    });
+  }
+
+  /***
+   * Create commit marker corresponding to hashing metadata file after post 
commit clustering operation
+   * @param table
+   * @param fileStatus
+   * @param partitionPath
+   * @throws IOException
+   */
+  private static void createCommitMarker(HoodieTable table, Path fileStatus, 
Path partitionPath) throws IOException {
+    HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+    Path fullPath = new Path(partitionPath, 
getTimestampFromFile(fileStatus.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+    if (fs.exists(fullPath)) {
+      return;
+    }
+    String metadata = "";
+    FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes()));
+  }
+
+  /***
+   * Load consistent hashing metadata from given file
+   * @param table
+   * @param metaFile
+   * @return
+   */
+  private static Option<HoodieConsistentHashingMetadata> 
loadMetadataFromGivenFile(HoodieTable table, FileStatus metaFile) {
+    try {
+      if (metaFile == null) {
+        return Option.empty();
+      }
+      byte[] content = 
FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath()));
+      return Option.of(HoodieConsistentHashingMetadata.fromBytes(content));
+    } catch (FileNotFoundException e) {
+      return Option.empty();
+    } catch (IOException e) {
+      LOG.error("Error when loading hashing metadata, for path: " + 
metaFile.getPath().getName(), e);
+      throw new HoodieIndexException("Error while loading hashing metadata", 
e);
+    }
+  }
+
+  /***
+   * COMMIT MARKER RECOVERY JOB.
+   * If particular hashing metadta file doesn't have commit marker then there 
could be a case where clustering is done but post commit marker
+   * creation operation failed. In this case this method will check file group 
id from consistent hashing metadata against storage base file group ids.
+   * if one of the file group matches then we can conclude that this is the 
latest metadata file.
+   * Note : we will end up calling this method if there is no marker file and 
no replace commit on active timeline, if replace commit is not present on
+   * active timeline that means old file group id's before clustering 
operation got cleaned and only new file group id's  of current clustering 
operation
+   * are present on the disk.
+   * @param table

Review Comment:
   done



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -271,8 +327,114 @@ public Option<HoodieRecordLocation> 
getRecordLocation(HoodieKey key) {
       }
 
       LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
-          + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
+              + 
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", 
record_key: " + key.toString());
       throw new HoodieIndexException("Failed to getBucket as hashing node has 
no file group");
     }
   }
+
+  /***
+   * Create commit marker -> hoodieinstant.commit in metadata folder, 
consistent hashing metadata reader will use it to
+   * identify latest commited file which will have updated commit metadata
+   * @param table
+   * @param hoodieInstant
+   */
+  public void commitIndexMetadataIfNeeded(HoodieTable table, String 
hoodieInstant) {
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair =
+            ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant));
+    if (!instantPlanPair.isPresent()) {
+      return;
+    }
+    HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+    List<Map<String, String>> partitionMapList = 
plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList());
+    partitionMapList.stream().forEach(partitionMap -> {
+      String partition = 
partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+      Path metadataPartitionPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+      Path metadataFilePath = new Path(metadataPartitionPath, hoodieInstant + 
HASHING_METADATA_FILE_SUFFIX);
+      try {
+        if (table.getMetaClient().getFs().exists(metadataFilePath)) {
+          createCommitMarker(table, metadataFilePath, metadataPartitionPath);
+        }
+      } catch (IOException e) {
+        throw new HoodieIOException("exception while committing hashing 
metadata for path " + metadataFilePath, e);
+      }
+    });
+  }
+
+  /***
+   * Create commit marker corresponding to hashing metadata file after post 
commit clustering operation
+   * @param table
+   * @param fileStatus
+   * @param partitionPath
+   * @throws IOException
+   */
+  private static void createCommitMarker(HoodieTable table, Path fileStatus, 
Path partitionPath) throws IOException {
+    HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+    Path fullPath = new Path(partitionPath, 
getTimestampFromFile(fileStatus.getName()) + 
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+    if (fs.exists(fullPath)) {
+      return;
+    }
+    String metadata = "";
+    FileIOUtils.createFileInPath(fs, fullPath, Option.of(metadata.getBytes()));
+  }
+
+  /***
+   * Load consistent hashing metadata from given file
+   * @param table

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to