This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ae3c02f  HUDI-162 : File System view must be built with correct 
timeline actions
ae3c02f is described below

commit ae3c02fb3fff47da8f20c4f9c8012a257a3df792
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Mon Jul 1 18:19:12 2019 -0700

    HUDI-162 : File System view must be built with correct timeline actions
---
 .../uber/hoodie/cli/commands/RepairsCommand.java   |  2 +-
 .../com/uber/hoodie/io/HoodieAppendHandle.java     |  2 +-
 .../uber/hoodie/table/HoodieCopyOnWriteTable.java  |  4 +--
 .../uber/hoodie/table/HoodieMergeOnReadTable.java  |  2 +-
 .../hoodie/common/model/HoodieCommitMetadata.java  |  4 +--
 .../uber/hoodie/common/table/HoodieTimeline.java   |  6 ++++
 .../table/timeline/HoodieDefaultTimeline.java      |  8 +++++
 .../table/view/AbstractTableFileSystemView.java    | 38 ++++++++++++++++------
 .../table/view/HoodieTableFileSystemView.java      |  1 -
 .../IncrementalTimelineSyncFileSystemView.java     | 16 ++++++++-
 .../table/view/RocksDbBasedFileSystemView.java     |  3 +-
 .../table/view/HoodieTableFileSystemViewTest.java  | 32 ++++++++++++++----
 12 files changed, 91 insertions(+), 27 deletions(-)

diff --git 
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java 
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
index 839cd5c..8967f2b 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
@@ -86,7 +86,7 @@ public class RepairsCommand implements CommandMarker {
 
     int ind = 0;
     for (String partition : partitionPaths) {
-      Path partitionPath = new Path(basePath, partition);
+      Path partitionPath = FSUtils.getPartitionPath(basePath, partition);
       String[] row = new String[3];
       row[0] = partition;
       row[1] = "Yes";
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
index cc98f56..99d3c63 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
@@ -274,7 +274,7 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieWri
     Optional<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();
 
     return HoodieLogFormat.newWriterBuilder()
-        .onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), 
partitionPath))
+        
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
         .withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(
             
latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
         .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java 
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
index 6b9f4f6..e1c4b99 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
@@ -315,7 +315,7 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
       throws IOException {
     logger.info("Cleaning path " + partitionPath);
     FileSystem fs = getMetaClient().getFs();
-    FileStatus[] toBeDeleted = fs.listStatus(new Path(config.getBasePath(), 
partitionPath), filter);
+    FileStatus[] toBeDeleted = 
fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), 
filter);
     for (FileStatus file : toBeDeleted) {
       boolean success = fs.delete(file.getPath(), false);
       results.put(file, success);
@@ -340,7 +340,7 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
       }
       return false;
     };
-    FileStatus[] toBeDeleted = fs.listStatus(new Path(config.getBasePath(), 
partitionPath), filter);
+    FileStatus[] toBeDeleted = 
fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), 
filter);
     for (FileStatus file : toBeDeleted) {
       boolean success = fs.delete(file.getPath(), false);
       results.put(file, success);
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java 
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
index 589b709..af45052 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
@@ -507,7 +507,7 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends
           String baseCommitTime = 
fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
           try {
             writer = HoodieLogFormat.newWriterBuilder().onParentPath(
-                new Path(this.getMetaClient().getBasePath(), partitionPath))
+                FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), 
partitionPath))
                 .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
                 .withFs(this.metaClient.getFs())
                 .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java
index 42ad091..b76ba46 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.uber.hoodie.common.util.FSUtils;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
@@ -30,7 +31,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -107,7 +107,7 @@ public class HoodieCommitMetadata implements Serializable {
     HashMap<String, String> fullPaths = new HashMap<>();
     for (Map.Entry<String, String> entry : 
getFileIdAndRelativePaths().entrySet()) {
       String fullPath =
-          (entry.getValue() != null) ? (new Path(basePath, 
entry.getValue())).toString() : null;
+          (entry.getValue() != null) ? (FSUtils.getPartitionPath(basePath, 
entry.getValue())).toString() : null;
       fullPaths.put(entry.getKey(), fullPath);
     }
     return fullPaths;
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java
index 24cab8d..642c08d 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java
@@ -106,6 +106,12 @@ public interface HoodieTimeline extends Serializable {
   HoodieTimeline filterCompletedAndCompactionInstants();
 
   /**
+   *  Timeline to just include commits (commit/deltacommit) and compaction 
actions
+   * @return
+   */
+  HoodieTimeline getCommitsAndCompactionTimeline();
+
+  /**
    * Filter this timeline to just include requested and inflight compaction 
instants
    * @return
    */
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java
index d77b9fe..ed80c3c 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java
@@ -18,6 +18,7 @@
 
 package com.uber.hoodie.common.table.timeline;
 
+import com.google.common.collect.Sets;
 import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.util.StringUtils;
 import com.uber.hoodie.exception.HoodieException;
@@ -25,6 +26,7 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -103,6 +105,12 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   }
 
   @Override
+  public HoodieTimeline getCommitsAndCompactionTimeline() {
+    Set<String> validActions = Sets.newHashSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, COMPACTION_ACTION);
+    return new HoodieDefaultTimeline(instants.stream().filter(s -> 
validActions.contains(s.getAction())), details);
+  }
+
+  @Override
   public HoodieTimeline filterPendingCompactionTimeline() {
     return new HoodieDefaultTimeline(
         instants.stream().filter(s -> 
s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)),
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
index 8b05b69..62410ac 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
@@ -71,8 +71,8 @@ public abstract class AbstractTableFileSystemView implements 
SyncableFileSystemV
 
   protected HoodieTableMetaClient metaClient;
 
-  // This is the commits that will be visible for all views extending this view
-  protected HoodieTimeline visibleActiveTimeline;
+  // This is the commits timeline that will be visible for all views extending 
this view
+  private HoodieTimeline visibleCommitsAndCompactionTimeline;
 
   // Used to concurrently load and populate partition views
   private ConcurrentHashMap<String, Boolean> addedPartitions = new 
ConcurrentHashMap<>(4096);
@@ -92,7 +92,8 @@ public abstract class AbstractTableFileSystemView implements 
SyncableFileSystemV
    */
   protected void init(HoodieTableMetaClient metaClient, HoodieTimeline 
visibleActiveTimeline) {
     this.metaClient = metaClient;
-    this.visibleActiveTimeline = visibleActiveTimeline;
+    refreshTimeline(visibleActiveTimeline);
+
     // Load Pending Compaction Operations
     resetPendingCompactionOperations(
         CompactionUtils.getAllPendingCompactionOperations(metaClient).values()
@@ -101,11 +102,19 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
   }
 
   /**
+   * Refresh commits timeline
+   * @param visibleActiveTimeline  Visible Active Timeline
+   */
+  protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
+    this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getCommitsAndCompactionTimeline();
+  }
+
+  /**
    * Adds the provided statuses into the file system view, and also caches it 
inside this object.
    */
   protected List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
     HoodieTimer timer = new HoodieTimer().startTimer();
-    List<HoodieFileGroup> fileGroups = buildFileGroups(statuses, 
visibleActiveTimeline, true);
+    List<HoodieFileGroup> fileGroups = buildFileGroups(statuses, 
visibleCommitsAndCompactionTimeline, true);
     long fgBuildTimeTakenMs = timer.endTimer();
     timer.startTimer();
     // Group by partition for efficient updates for both InMemory and 
DiskBased stuctures.
@@ -133,7 +142,6 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
 
   protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieDataFile> 
dataFileStream,
       Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean 
addPendingCompactionFileSlice) {
-
     Map<Pair<String, String>, List<HoodieDataFile>> dataFiles = dataFileStream
         .collect(Collectors.groupingBy((dataFile) -> {
           String partitionPathStr = 
getPartitionPathFromFilePath(dataFile.getPath());
@@ -187,7 +195,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       resetViewState();
 
       // Initialize with new Hoodie timeline.
-      init(metaClient, visibleActiveTimeline);
+      init(metaClient, getTimeline());
     } finally {
       writeLock.unlock();
     }
@@ -288,6 +296,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
   protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
     Option<Pair<String, CompactionOperation>> compactionWithInstantTime =
         getPendingCompactionOperationWithInstant(fileSlice.getFileGroupId());
+    log.info("Pending Compaction instant for (" + fileSlice + ") is :" + 
compactionWithInstantTime);
     return (compactionWithInstantTime.isPresent())
         && 
fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.get().getKey());
   }
@@ -300,6 +309,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
    */
   protected FileSlice filterDataFileAfterPendingCompaction(FileSlice 
fileSlice) {
     if (isFileSliceAfterPendingCompaction(fileSlice)) {
+      log.info("File Slice (" + fileSlice + ") is in pending compaction");
       // Data file is filtered out of the file-slice as the corresponding 
compaction
       // instant not completed yet.
       FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(),
@@ -417,7 +427,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
       return fetchAllDataFiles(partitionPath)
-          .filter(df -> 
visibleActiveTimeline.containsOrBeforeTimelineStarts(df.getCommitTime()))
+          .filter(df -> 
visibleCommitsAndCompactionTimeline.containsOrBeforeTimelineStarts(df.getCommitTime()))
           .filter(df -> !isDataFileDueToPendingCompaction(df));
     } finally {
       readLock.unlock();
@@ -794,12 +804,12 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
 
   @Override
   public Option<HoodieInstant> getLastInstant() {
-    return Option.fromJavaOptional(visibleActiveTimeline.lastInstant());
+    return Option.fromJavaOptional(getTimeline().lastInstant());
   }
 
   @Override
   public HoodieTimeline getTimeline() {
-    return visibleActiveTimeline;
+    return visibleCommitsAndCompactionTimeline;
   }
 
   @Override
@@ -822,10 +832,18 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
    * @param newTimeline New Hoodie Timeline
    */
   protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline 
newTimeline) {
-    visibleActiveTimeline = newTimeline;
+    refreshTimeline(newTimeline);
     addedPartitions.clear();
     resetViewState();
     // Initialize with new Hoodie timeline.
     init(metaClient, newTimeline);
   }
+
+  /**
+   * Return Only Commits and Compaction timeline for building file-groups
+   * @return
+   */
+  public HoodieTimeline getVisibleCommitsAndCompactionTimeline() {
+    return visibleCommitsAndCompactionTimeline;
+  }
 }
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java
index 2e52b9f..741d7ee 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java
@@ -198,7 +198,6 @@ public class HoodieTableFileSystemView extends 
IncrementalTimelineSyncFileSystem
     partitionToFileGroupsMap.put(partitionPath, newList);
   }
 
-
   @Override
   public Stream<HoodieFileGroup> fetchAllStoredFileGroups() {
     return partitionToFileGroupsMap.values().stream().flatMap(fg -> {
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java
index 76e7a53..c087de0 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java
@@ -56,11 +56,20 @@ public abstract class IncrementalTimelineSyncFileSystemView 
extends AbstractTabl
   // Allows incremental Timeline syncing
   private final boolean incrementalTimelineSyncEnabled;
 
+  // This is the visible active timeline used only for incremental view syncing
+  private HoodieTimeline visibleActiveTimeline;
+
   protected IncrementalTimelineSyncFileSystemView(boolean 
enableIncrementalTimelineSync) {
     this.incrementalTimelineSyncEnabled = enableIncrementalTimelineSync;
   }
 
   @Override
+  protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
+    this.visibleActiveTimeline = visibleActiveTimeline;
+    super.refreshTimeline(visibleActiveTimeline);
+  }
+
+  @Override
   protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline 
newTimeline) {
     try {
       if (incrementalTimelineSyncEnabled) {
@@ -70,7 +79,7 @@ public abstract class IncrementalTimelineSyncFileSystemView 
extends AbstractTabl
           runIncrementalSync(newTimeline, diffResult);
           log.info("Finished incremental sync");
           // Reset timeline to latest
-          visibleActiveTimeline = newTimeline;
+          refreshTimeline(newTimeline);
           return;
         }
       }
@@ -337,4 +346,9 @@ public abstract class IncrementalTimelineSyncFileSystemView 
extends AbstractTabl
         buildFileGroups(viewDataFiles.values().stream(), 
viewLogFiles.values().stream(), timeline, true);
     storePartitionView(partition, fgs);
   }
+
+  @Override
+  public HoodieTimeline getTimeline() {
+    return visibleActiveTimeline;
+  }
 }
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
index 9edbb94..ab598e8 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
@@ -319,7 +319,8 @@ public class RocksDbBasedFileSystemView extends 
IncrementalTimelineSyncFileSyste
     return sliceStream.map(s -> Pair.of(Pair.of(s.getPartitionPath(), 
s.getFileId()), s))
         
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream().map(slicePair 
-> {
           HoodieFileGroup fg =
-              new HoodieFileGroup(slicePair.getKey().getKey(), 
slicePair.getKey().getValue(), visibleActiveTimeline);
+              new HoodieFileGroup(slicePair.getKey().getKey(), 
slicePair.getKey().getValue(),
+                  getVisibleCommitsAndCompactionTimeline());
           slicePair.getValue().forEach(e -> fg.addFileSlice(e.getValue()));
           return fg;
         });
diff --git 
a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
 
b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
index 4cae7a6..7a21a3d 100644
--- 
a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
+++ 
b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
@@ -103,8 +103,7 @@ public class HoodieTableFileSystemViewTest {
       fsView.close();
       fsView = null;
     }
-    fsView = getFileSystemView(
-        
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants());
+    fsView = 
getFileSystemView(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
     roView = (TableFileSystemView.ReadOptimizedView) fsView;
     rtView = (TableFileSystemView.RealtimeView) fsView;
   }
@@ -615,10 +614,12 @@ public class HoodieTableFileSystemViewTest {
     // Put some files in the partition
     String fullPartitionPath = basePath + "/2016/05/01/";
     new File(fullPartitionPath).mkdirs();
-    String commitTime1 = "1";
-    String commitTime2 = "2";
-    String commitTime3 = "3";
-    String commitTime4 = "4";
+    String cleanTime1 = "1";
+    String commitTime1 = "2";
+    String commitTime2 = "3";
+    String commitTime3 = "4";
+    String commitTime4 = "5";
+
     String fileId1 = UUID.randomUUID().toString();
     String fileId2 = UUID.randomUUID().toString();
     String fileId3 = UUID.randomUUID().toString();
@@ -640,11 +641,29 @@ public class HoodieTableFileSystemViewTest {
     new File(fullPartitionPath + FSUtils.makeLogFileName(fileId4, 
HoodieLogFile.DELTA_EXTENSION,
         commitTime4, 0, TEST_WRITE_TOKEN)).createNewFile();
 
+    // Create commit/clean files
+    new File(basePath + "/.hoodie/" + cleanTime1 + ".clean").createNewFile();
     new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
     new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
     new File(basePath + "/.hoodie/" + commitTime3 + ".commit").createNewFile();
     new File(basePath + "/.hoodie/" + commitTime4 + ".commit").createNewFile();
 
+    testStreamLatestVersionInPartition(isLatestFileSliceOnly, 
fullPartitionPath, commitTime1, commitTime2, commitTime3,
+        commitTime4, fileId1, fileId2, fileId3, fileId4);
+
+    // Now create a scenario where archiving deleted commits (1,2, and 3)  but 
retained cleaner clean1. Now clean1 is
+    // the lowest commit time. Scenario for HUDI-162 - Here clean is the 
earliest action in active timeline
+    new File(basePath + "/.hoodie/" + commitTime1 + ".commit").delete();
+    new File(basePath + "/.hoodie/" + commitTime2 + ".commit").delete();
+    new File(basePath + "/.hoodie/" + commitTime3 + ".commit").delete();
+    testStreamLatestVersionInPartition(isLatestFileSliceOnly, 
fullPartitionPath, commitTime1, commitTime2, commitTime3,
+        commitTime4, fileId1, fileId2, fileId3, fileId4);
+  }
+
+  private void testStreamLatestVersionInPartition(boolean 
isLatestFileSliceOnly, String fullPartitionPath,
+      String commitTime1, String commitTime2, String commitTime3, String 
commitTime4, String fileId1, String fileId2,
+      String fileId3, String fileId4) throws IOException {
+
     // Now we list the entire partition
     FileStatus[] statuses = metaClient.getFs().listStatus(new 
Path(fullPartitionPath));
     assertEquals(11, statuses.length);
@@ -711,7 +730,6 @@ public class HoodieTableFileSystemViewTest {
     assertEquals(logFilesList.size(), 1);
     assertTrue(logFilesList.get(0).getFileName()
         .equals(FSUtils.makeLogFileName(fileId2, 
HoodieLogFile.DELTA_EXTENSION, commitTime3, 0, TEST_WRITE_TOKEN)));
-
   }
 
   @Test

Reply via email to