This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a1483f2  HUDI-148 Small File selection logic for MOR must skip fileIds 
selected for pending compaction correctly
a1483f2 is described below

commit a1483f2c5f3b921d1117d31f75453e45e5717259
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Sat Jun 8 12:40:08 2019 -0700

    HUDI-148 Small File selection logic for MOR must skip fileIds selected for 
pending compaction correctly
---
 .../main/java/com/uber/hoodie/CompactionAdminClient.java    |  2 +-
 .../java/com/uber/hoodie/table/HoodieMergeOnReadTable.java  |  7 ++++---
 .../src/test/java/com/uber/hoodie/TestCleaner.java          |  2 +-
 .../java/com/uber/hoodie/TestCompactionAdminClient.java     |  4 ++--
 .../com/uber/hoodie/common/table/TableFileSystemView.java   | 10 ++++++----
 .../common/table/view/AbstractTableFileSystemView.java      | 12 +++++++++---
 .../common/table/view/PriorityBasedFileSystemView.java      |  7 ++++---
 .../common/table/view/RemoteHoodieTableFileSystemView.java  |  9 +++++++--
 .../common/table/view/HoodieTableFileSystemViewTest.java    | 13 +++++++++----
 .../uber/hoodie/timeline/service/FileSystemViewHandler.java |  3 ++-
 .../hoodie/timeline/service/handlers/FileSliceHandler.java  |  6 +++---
 11 files changed, 48 insertions(+), 27 deletions(-)

diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java 
b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
index 34f7dc1..751e7dc 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
@@ -442,7 +442,7 @@ public class CompactionAdminClient extends 
AbstractHoodieClient {
                     .sorted(HoodieLogFile.getLogFileComparator())
                     .collect(Collectors.toList());
     FileSlice fileSliceForCompaction =
-        
fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), 
operation.getBaseInstantTime())
+        
fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), 
operation.getBaseInstantTime(), true)
             .filter(fs -> 
fs.getFileId().equals(operation.getFileId())).findFirst().get();
     int maxUsedVersion =
         
fileSliceForCompaction.getLogFiles().findFirst().map(HoodieLogFile::getLogVersion)
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java 
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
index bcc2190..589b709 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
@@ -384,7 +384,7 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends
           // TODO : choose last N small files since there can be multiple 
small files written to a single partition
           // by different spark partitions in a single batch
           Optional<FileSlice> smallFileSlice = getRTFileSystemView()
-              .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp()).filter(
+              .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false).filter(
                   fileSlice -> fileSlice.getLogFiles().count() < 1
                       && fileSlice.getDataFile().get().getFileSize() < config
                       .getParquetSmallFileLimit()).sorted((FileSlice left, 
FileSlice right) ->
@@ -394,9 +394,10 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends
             allSmallFileSlices.add(smallFileSlice.get());
           }
         } else {
-          // If we can index log files, we can add more inserts to log files.
+          // If we can index log files, we can add more inserts to log files 
for fileIds including those under
+          // pending compaction.
           List<FileSlice> allFileSlices = getRTFileSystemView()
-              .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp())
+              .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), true)
               .collect(Collectors.toList());
           for (FileSlice fileSlice : allFileSlices) {
             if (isSmallFile(partitionPath, fileSlice)) {
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java 
b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
index afecd69..7798805 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
@@ -865,7 +865,7 @@ public class TestCleaner extends TestHoodieClientBase {
       String baseInstantForCompaction = 
fileIdToLatestInstantBeforeCompaction.get(fileId);
       Optional<FileSlice> fileSliceForCompaction =
           
hoodieTable.getRTFileSystemView().getLatestFileSlicesBeforeOrOn(DEFAULT_FIRST_PARTITION_PATH,
-              baseInstantForCompaction).filter(fs -> 
fs.getFileId().equals(fileId)).findFirst();
+              baseInstantForCompaction, true).filter(fs -> 
fs.getFileId().equals(fileId)).findFirst();
       Assert.assertTrue("Base Instant for Compaction must be preserved", 
fileSliceForCompaction.isPresent());
       Assert.assertTrue("FileSlice has data-file", 
fileSliceForCompaction.get().getDataFile().isPresent());
       Assert.assertEquals("FileSlice has log-files", 2,
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java 
b/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java
index 91f2d33..4509db6 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java
@@ -280,7 +280,7 @@ public class TestCompactionAdminClient extends 
TestHoodieClientBase {
     final HoodieTableFileSystemView newFsView =
         new HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsAndCompactionTimeline());
     // Expect all file-slice whose base-commit is same as compaction commit to 
contain no new Log files
-    
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
 compactionInstant)
+    
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
 compactionInstant, true)
         .filter(fs -> 
fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
           Assert.assertFalse("No Data file must be present", 
fs.getDataFile().isPresent());
           Assert.assertTrue("No Log Files", fs.getLogFiles().count() == 0);
@@ -342,7 +342,7 @@ public class TestCompactionAdminClient extends 
TestHoodieClientBase {
     final HoodieTableFileSystemView newFsView =
         new HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsAndCompactionTimeline());
     // Expect all file-slice whose base-commit is same as compaction commit to 
contain no new Log files
-    
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
 compactionInstant)
+    
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
 compactionInstant, true)
         .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
         .filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> {
           Assert.assertFalse("No Data file must be present", 
fs.getDataFile().isPresent());
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java
index 01ff0a5..5605e3a 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java
@@ -104,13 +104,15 @@ public interface TableFileSystemView {
      */
     Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath);
 
-
     /**
-     * Stream all the latest file slices in the given partition with 
precondition that
-     * commitTime(file) before maxCommitTime
+     * Stream all latest file slices in  given partition with precondition 
that commitTime(file) before maxCommitTime
+     *
+     * @param partitionPath Partition path
+     * @param maxCommitTime Max Instant Time
+     * @param includeFileSlicesInPendingCompaction include file-slices that 
are in pending compaction
      */
     Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
-        String maxCommitTime);
+        String maxCommitTime, boolean includeFileSlicesInPendingCompaction);
 
     /**
      * Stream all "merged" file-slices before on an instant time
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
index d9a5abe..129a584 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
@@ -476,13 +476,19 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
   }
 
   @Override
-  public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String 
partitionStr, String maxCommitTime) {
+  public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String 
partitionStr, String maxCommitTime,
+      boolean includeFileSlicesInPendingCompaction) {
     try {
       readLock.lock();
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
-      return fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
-          .map(fs -> filterDataFileAfterPendingCompaction(fs));
+      Stream<FileSlice> fileSliceStream =
+          fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime);
+      if (includeFileSlicesInPendingCompaction) {
+        return fileSliceStream.map(fs -> 
filterDataFileAfterPendingCompaction(fs));
+      } else {
+        return fileSliceStream.filter(fs -> 
!isPendingCompactionScheduledForFileId(fs.getFileGroupId()));
+      }
     } finally {
       readLock.unlock();
     }
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java
index 2e49ac5..601c5f2 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java
@@ -166,9 +166,10 @@ public class PriorityBasedFileSystemView implements 
SyncableFileSystemView, Seri
   }
 
   @Override
-  public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, 
String maxCommitTime) {
-    return execute(partitionPath, maxCommitTime, 
preferredView::getLatestFileSlicesBeforeOrOn,
-        secondaryView::getLatestFileSlicesBeforeOrOn);
+  public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, 
String maxCommitTime,
+      boolean includeFileSlicesInPendingCompaction) {
+    return execute(partitionPath, maxCommitTime, 
includeFileSlicesInPendingCompaction,
+        preferredView::getLatestFileSlicesBeforeOrOn, 
secondaryView::getLatestFileSlicesBeforeOrOn);
   }
 
   @Override
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java
index fa7f36c..9d6972f 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -108,6 +108,8 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   public static final String LAST_INSTANT_TS = "lastinstantts";
   public static final String TIMELINE_HASH = "timelinehash";
   public static final String REFRESH_OFF = "refreshoff";
+  public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = 
"includependingcompaction";
+
 
   private static Logger log = 
LogManager.getLogger(RemoteHoodieTableFileSystemView.class);
 
@@ -327,8 +329,11 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   }
 
   @Override
-  public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, 
String maxCommitTime) {
-    Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
+  public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, 
String maxCommitTime,
+      boolean includeFileSlicesInPendingCompaction) {
+    Map<String, String> paramsMap = 
getParamsWithAdditionalParams(partitionPath,
+        new String[]{MAX_INSTANT_PARAM, 
INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM},
+        new String[]{maxCommitTime, 
String.valueOf(includeFileSlicesInPendingCompaction)});
     try {
       List<FileSliceDTO> dataFiles = 
executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap,
           new TypeReference<List<FileSliceDTO>>() {
diff --git 
a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
 
b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
index 8d09c60..4cae7a6 100644
--- 
a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
+++ 
b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
@@ -351,7 +351,7 @@ public class HoodieTableFileSystemViewTest {
     assertEquals("Log File Order check", fileName2, 
logFiles.get(2).getFileName());
     assertEquals("Log File Order check", fileName1, 
logFiles.get(3).getFileName());
 
-    fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, 
deltaInstantTime5)
+    fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, 
deltaInstantTime5, true)
         .collect(Collectors.toList());
     assertEquals("Expect only one file-id", 1, fileSliceList.size());
     fileSlice = fileSliceList.get(0);
@@ -672,7 +672,7 @@ public class HoodieTableFileSystemViewTest {
     assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 
TEST_WRITE_TOKEN, fileId3)));
 
     filenames = Sets.newHashSet();
-    List<HoodieLogFile> logFilesList = 
rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4)
+    List<HoodieLogFile> logFilesList = 
rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4, true)
         .map(slice -> slice.getLogFiles()).flatMap(logFileList -> logFileList)
         .collect(Collectors.toList());
     assertEquals(logFilesList.size(), 4);
@@ -706,7 +706,7 @@ public class HoodieTableFileSystemViewTest {
     }
 
     logFilesList =
-        rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", 
commitTime3).map(slice -> slice.getLogFiles())
+        rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime3, 
true).map(slice -> slice.getLogFiles())
             .flatMap(logFileList -> logFileList).collect(Collectors.toList());
     assertEquals(logFilesList.size(), 1);
     assertTrue(logFilesList.get(0).getFileName()
@@ -1135,7 +1135,7 @@ public class HoodieTableFileSystemViewTest {
       assertEquals("Log File Order check", fileName3, 
logFiles.get(1).getFileName());
       assertEquals("Log File Order check", fileName1, 
logFiles.get(2).getFileName());
 
-      fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, 
deltaInstantTime5)
+      fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, 
deltaInstantTime5, true)
           .collect(Collectors.toList());
       assertEquals("Expect only one file-id", 1, fileSliceList.size());
       fileSlice = fileSliceList.get(0);
@@ -1147,6 +1147,11 @@ public class HoodieTableFileSystemViewTest {
       assertEquals("Log files must include only those after compaction 
request", 2, logFiles.size());
       assertEquals("Log File Order check", fileName4, 
logFiles.get(0).getFileName());
       assertEquals("Log File Order check", fileName3, 
logFiles.get(1).getFileName());
+
+      // Check getLatestFileSlicesBeforeOrOn excluding fileIds in pending 
compaction
+      fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, 
deltaInstantTime5, false)
+          .collect(Collectors.toList());
+      assertEquals("Expect empty list as file-id is in pending compaction", 0, 
fileSliceList.size());
     });
 
     Assert.assertEquals(3, fsView.getPendingCompactionOperations().count());
diff --git 
a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java
 
b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java
index b842626..483bc6f 100644
--- 
a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java
+++ 
b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java
@@ -265,7 +265,8 @@ public class FileSystemViewHandler {
       List<FileSliceDTO> dtos =
           
sliceHandler.getLatestFileSlicesBeforeOrOn(ctx.validatedQueryParam(BASEPATH_PARAM).getOrThrow(),
               ctx.validatedQueryParam(PARTITION_PARAM).getOrThrow(),
-              ctx.validatedQueryParam(MAX_INSTANT_PARAM).getOrThrow());
+              ctx.validatedQueryParam(MAX_INSTANT_PARAM).getOrThrow(),
+              
Boolean.valueOf(ctx.validatedQueryParam(INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM).getOrThrow()));
       writeValueAsString(ctx, dtos);
     }, true));
 
diff --git 
a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java
 
b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java
index c2fbb0c..01aceae 100644
--- 
a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java
+++ 
b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java
@@ -56,9 +56,9 @@ public class FileSliceHandler extends Handler {
   }
 
   public List<FileSliceDTO> getLatestFileSlicesBeforeOrOn(String basePath, 
String partitionPath,
-      String maxInstantTime) {
-    return 
viewManager.getFileSystemView(basePath).getLatestFileSlicesBeforeOrOn(partitionPath,
 maxInstantTime)
-        .map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());
+      String maxInstantTime, boolean includeFileSlicesInPendingCompaction) {
+    return 
viewManager.getFileSystemView(basePath).getLatestFileSlicesBeforeOrOn(partitionPath,
 maxInstantTime,
+        
includeFileSlicesInPendingCompaction).map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());
   }
 
   public List<FileSliceDTO> getLatestUnCompactedFileSlices(String basePath, 
String partitionPath) {

Reply via email to