bvaradar commented on a change in pull request #1009:  [HUDI-308] Avoid Renames 
for tracking state transitions of all actions on dataset
URL: https://github.com/apache/incubator-hudi/pull/1009#discussion_r354216781
 
 

 ##########
 File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 ##########
 @@ -420,17 +427,50 @@ public String getCommitActionType() {
    * @return List of Hoodie Instants generated
    * @throws IOException in case of failure
    */
-  public static List<HoodieInstant> 
scanHoodieInstantsFromFileSystem(FileSystem fs, Path metaPath,
-      Set<String> includedExtensions) throws IOException {
-    return Arrays.stream(HoodieTableMetaClient.scanFiles(fs, metaPath, path -> 
{
-      // Include only the meta files with extensions that needs to be included
-      String extension = FSUtils.getFileExtension(path.getName());
-      return includedExtensions.contains(extension);
-    })).sorted(Comparator.comparing(
-        // Sort the meta-data by the instant time (first part of the file name)
-        fileStatus -> FSUtils.getInstantTime(fileStatus.getPath().getName())))
-        // create HoodieInstantMarkers from FileStatus, which extracts 
properties
-        .map(HoodieInstant::new).collect(Collectors.toList());
+  public static List<HoodieInstant> scanHoodieInstantsFromFileSystem(
+      FileSystem fs, Path metaPath, Set<String> includedExtensions) throws 
IOException {
+    return scanHoodieInstantsFromFileSystem(fs, metaPath, includedExtensions, 
true);
+  }
+
+  /**
+   * Helper method to scan all hoodie-instant metafiles and construct 
HoodieInstant objects
+   *
+   * @param fs                 FileSystem
+   * @param metaPath           Meta Path where hoodie instants are present
+   * @param includedExtensions Included hoodie extensions
+   * @param excludeIntermediateStates If there are multiple states for the 
same action instant,
+   *                                  only include the highest state
+   * @return List of Hoodie Instants generated
+   * @throws IOException in case of failure
+   */
+  public static List<HoodieInstant> scanHoodieInstantsFromFileSystem(
+      FileSystem fs, Path metaPath, Set<String> includedExtensions, boolean 
excludeIntermediateStates)
+      throws IOException {
+    Stream<HoodieInstant> instantStream = Arrays.stream(
+        HoodieTableMetaClient
+            .scanFiles(fs, metaPath, path -> {
+              // Include only the meta files with extensions that needs to be 
included
+              String extension = FSUtils.getFileExtension(path.getName());
+              return includedExtensions.contains(extension);
+            })).map(HoodieInstant::new);
+
+    if (excludeIntermediateStates) {
+      // Remove intermediate states for each (ts,action) pair
+      instantStream = dedupeInstants(instantStream);
+    }
+    return instantStream.sorted().collect(Collectors.toList());
+  }
+
+  public static Stream<HoodieInstant> dedupeInstants(Stream<HoodieInstant> 
instantStream) {
+    return instantStream.collect(Collectors.groupingBy(x -> 
Pair.of(x.getTimestamp(),
+        x.getAction().equals(HoodieTimeline.COMPACTION_ACTION) ? 
HoodieTimeline.COMMIT_ACTION : x.getAction())))
 
 Review comment:
   Refactored to reuse this logic.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to