This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a1483f2 HUDI-148 Small File selection logic for MOR must skip fileIds
selected for pending compaction correctly
a1483f2 is described below
commit a1483f2c5f3b921d1117d31f75453e45e5717259
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Sat Jun 8 12:40:08 2019 -0700
HUDI-148 Small File selection logic for MOR must skip fileIds selected for
pending compaction correctly
---
.../main/java/com/uber/hoodie/CompactionAdminClient.java | 2 +-
.../java/com/uber/hoodie/table/HoodieMergeOnReadTable.java | 7 ++++---
.../src/test/java/com/uber/hoodie/TestCleaner.java | 2 +-
.../java/com/uber/hoodie/TestCompactionAdminClient.java | 4 ++--
.../com/uber/hoodie/common/table/TableFileSystemView.java | 10 ++++++----
.../common/table/view/AbstractTableFileSystemView.java | 12 +++++++++---
.../common/table/view/PriorityBasedFileSystemView.java | 7 ++++---
.../common/table/view/RemoteHoodieTableFileSystemView.java | 9 +++++++--
.../common/table/view/HoodieTableFileSystemViewTest.java | 13 +++++++++----
.../uber/hoodie/timeline/service/FileSystemViewHandler.java | 3 ++-
.../hoodie/timeline/service/handlers/FileSliceHandler.java | 6 +++---
11 files changed, 48 insertions(+), 27 deletions(-)
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
index 34f7dc1..751e7dc 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
@@ -442,7 +442,7 @@ public class CompactionAdminClient extends
AbstractHoodieClient {
.sorted(HoodieLogFile.getLogFileComparator())
.collect(Collectors.toList());
FileSlice fileSliceForCompaction =
-
fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(),
operation.getBaseInstantTime())
+
fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(),
operation.getBaseInstantTime(), true)
.filter(fs ->
fs.getFileId().equals(operation.getFileId())).findFirst().get();
int maxUsedVersion =
fileSliceForCompaction.getLogFiles().findFirst().map(HoodieLogFile::getLogVersion)
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
index bcc2190..589b709 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
@@ -384,7 +384,7 @@ public class HoodieMergeOnReadTable<T extends
HoodieRecordPayload> extends
// TODO : choose last N small files since there can be multiple
small files written to a single partition
// by different spark partitions in a single batch
Optional<FileSlice> smallFileSlice = getRTFileSystemView()
- .getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp()).filter(
+ .getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp(), false).filter(
fileSlice -> fileSlice.getLogFiles().count() < 1
&& fileSlice.getDataFile().get().getFileSize() < config
.getParquetSmallFileLimit()).sorted((FileSlice left,
FileSlice right) ->
@@ -394,9 +394,10 @@ public class HoodieMergeOnReadTable<T extends
HoodieRecordPayload> extends
allSmallFileSlices.add(smallFileSlice.get());
}
} else {
- // If we can index log files, we can add more inserts to log files.
+ // If we can index log files, we can add more inserts to log files
for fileIds including those under
+ // pending compaction.
List<FileSlice> allFileSlices = getRTFileSystemView()
- .getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp())
+ .getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp(), true)
.collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(partitionPath, fileSlice)) {
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
index afecd69..7798805 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
@@ -865,7 +865,7 @@ public class TestCleaner extends TestHoodieClientBase {
String baseInstantForCompaction =
fileIdToLatestInstantBeforeCompaction.get(fileId);
Optional<FileSlice> fileSliceForCompaction =
hoodieTable.getRTFileSystemView().getLatestFileSlicesBeforeOrOn(DEFAULT_FIRST_PARTITION_PATH,
- baseInstantForCompaction).filter(fs ->
fs.getFileId().equals(fileId)).findFirst();
+ baseInstantForCompaction, true).filter(fs ->
fs.getFileId().equals(fileId)).findFirst();
Assert.assertTrue("Base Instant for Compaction must be preserved",
fileSliceForCompaction.isPresent());
Assert.assertTrue("FileSlice has data-file",
fileSliceForCompaction.get().getDataFile().isPresent());
Assert.assertEquals("FileSlice has log-files", 2,
diff --git
a/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java
b/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java
index 91f2d33..4509db6 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java
@@ -280,7 +280,7 @@ public class TestCompactionAdminClient extends
TestHoodieClientBase {
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to
contain no new Log files
-
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
compactionInstant)
+
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
compactionInstant, true)
.filter(fs ->
fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
Assert.assertFalse("No Data file must be present",
fs.getDataFile().isPresent());
Assert.assertTrue("No Log Files", fs.getLogFiles().count() == 0);
@@ -342,7 +342,7 @@ public class TestCompactionAdminClient extends
TestHoodieClientBase {
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to
contain no new Log files
-
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
compactionInstant)
+
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> {
Assert.assertFalse("No Data file must be present",
fs.getDataFile().isPresent());
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java
index 01ff0a5..5605e3a 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java
@@ -104,13 +104,15 @@ public interface TableFileSystemView {
*/
Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath);
-
/**
- * Stream all the latest file slices in the given partition with
precondition that
- * commitTime(file) before maxCommitTime
+ * Stream all latest file slices in given partition with precondition
that commitTime(file) before maxCommitTime
+ *
+ * @param partitionPath Partition path
+ * @param maxCommitTime Max Instant Time
+ * @param includeFileSlicesInPendingCompaction include file-slices that
are in pending compaction
*/
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
- String maxCommitTime);
+ String maxCommitTime, boolean includeFileSlicesInPendingCompaction);
/**
* Stream all "merged" file-slices before on an instant time
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
index d9a5abe..129a584 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
@@ -476,13 +476,19 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
@Override
- public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String
partitionStr, String maxCommitTime) {
+ public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String
partitionStr, String maxCommitTime,
+ boolean includeFileSlicesInPendingCompaction) {
try {
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
- return fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
- .map(fs -> filterDataFileAfterPendingCompaction(fs));
+ Stream<FileSlice> fileSliceStream =
+ fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime);
+ if (includeFileSlicesInPendingCompaction) {
+ return fileSliceStream.map(fs ->
filterDataFileAfterPendingCompaction(fs));
+ } else {
+ return fileSliceStream.filter(fs ->
!isPendingCompactionScheduledForFileId(fs.getFileGroupId()));
+ }
} finally {
readLock.unlock();
}
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java
index 2e49ac5..601c5f2 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java
@@ -166,9 +166,10 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
}
@Override
- public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime) {
- return execute(partitionPath, maxCommitTime,
preferredView::getLatestFileSlicesBeforeOrOn,
- secondaryView::getLatestFileSlicesBeforeOrOn);
+ public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime,
+ boolean includeFileSlicesInPendingCompaction) {
+ return execute(partitionPath, maxCommitTime,
includeFileSlicesInPendingCompaction,
+ preferredView::getLatestFileSlicesBeforeOrOn,
secondaryView::getLatestFileSlicesBeforeOrOn);
}
@Override
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java
index fa7f36c..9d6972f 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -108,6 +108,8 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
public static final String LAST_INSTANT_TS = "lastinstantts";
public static final String TIMELINE_HASH = "timelinehash";
public static final String REFRESH_OFF = "refreshoff";
+ public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM =
"includependingcompaction";
+
private static Logger log =
LogManager.getLogger(RemoteHoodieTableFileSystemView.class);
@@ -327,8 +329,11 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
}
@Override
- public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime) {
- Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
+ public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime,
+ boolean includeFileSlicesInPendingCompaction) {
+ Map<String, String> paramsMap =
getParamsWithAdditionalParams(partitionPath,
+ new String[]{MAX_INSTANT_PARAM,
INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM},
+ new String[]{maxCommitTime,
String.valueOf(includeFileSlicesInPendingCompaction)});
try {
List<FileSliceDTO> dataFiles =
executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap,
new TypeReference<List<FileSliceDTO>>() {
diff --git
a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
index 8d09c60..4cae7a6 100644
---
a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
+++
b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
@@ -351,7 +351,7 @@ public class HoodieTableFileSystemViewTest {
assertEquals("Log File Order check", fileName2,
logFiles.get(2).getFileName());
assertEquals("Log File Order check", fileName1,
logFiles.get(3).getFileName());
- fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath,
deltaInstantTime5)
+ fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath,
deltaInstantTime5, true)
.collect(Collectors.toList());
assertEquals("Expect only one file-id", 1, fileSliceList.size());
fileSlice = fileSliceList.get(0);
@@ -672,7 +672,7 @@ public class HoodieTableFileSystemViewTest {
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4,
TEST_WRITE_TOKEN, fileId3)));
filenames = Sets.newHashSet();
- List<HoodieLogFile> logFilesList =
rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4)
+ List<HoodieLogFile> logFilesList =
rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4, true)
.map(slice -> slice.getLogFiles()).flatMap(logFileList -> logFileList)
.collect(Collectors.toList());
assertEquals(logFilesList.size(), 4);
@@ -706,7 +706,7 @@ public class HoodieTableFileSystemViewTest {
}
logFilesList =
- rtView.getLatestFileSlicesBeforeOrOn("2016/05/01",
commitTime3).map(slice -> slice.getLogFiles())
+ rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime3,
true).map(slice -> slice.getLogFiles())
.flatMap(logFileList -> logFileList).collect(Collectors.toList());
assertEquals(logFilesList.size(), 1);
assertTrue(logFilesList.get(0).getFileName()
@@ -1135,7 +1135,7 @@ public class HoodieTableFileSystemViewTest {
assertEquals("Log File Order check", fileName3,
logFiles.get(1).getFileName());
assertEquals("Log File Order check", fileName1,
logFiles.get(2).getFileName());
- fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath,
deltaInstantTime5)
+ fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath,
deltaInstantTime5, true)
.collect(Collectors.toList());
assertEquals("Expect only one file-id", 1, fileSliceList.size());
fileSlice = fileSliceList.get(0);
@@ -1147,6 +1147,11 @@ public class HoodieTableFileSystemViewTest {
assertEquals("Log files must include only those after compaction
request", 2, logFiles.size());
assertEquals("Log File Order check", fileName4,
logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName3,
logFiles.get(1).getFileName());
+
+ // Check getLatestFileSlicesBeforeOrOn excluding fileIds in pending
compaction
+ fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath,
deltaInstantTime5, false)
+ .collect(Collectors.toList());
+ assertEquals("Expect empty list as file-id is in pending compaction", 0,
fileSliceList.size());
});
Assert.assertEquals(3, fsView.getPendingCompactionOperations().count());
diff --git
a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java
b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java
index b842626..483bc6f 100644
---
a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java
+++
b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java
@@ -265,7 +265,8 @@ public class FileSystemViewHandler {
List<FileSliceDTO> dtos =
sliceHandler.getLatestFileSlicesBeforeOrOn(ctx.validatedQueryParam(BASEPATH_PARAM).getOrThrow(),
ctx.validatedQueryParam(PARTITION_PARAM).getOrThrow(),
- ctx.validatedQueryParam(MAX_INSTANT_PARAM).getOrThrow());
+ ctx.validatedQueryParam(MAX_INSTANT_PARAM).getOrThrow(),
+
Boolean.valueOf(ctx.validatedQueryParam(INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM).getOrThrow()));
writeValueAsString(ctx, dtos);
}, true));
diff --git
a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java
b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java
index c2fbb0c..01aceae 100644
---
a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java
+++
b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java
@@ -56,9 +56,9 @@ public class FileSliceHandler extends Handler {
}
public List<FileSliceDTO> getLatestFileSlicesBeforeOrOn(String basePath,
String partitionPath,
- String maxInstantTime) {
- return
viewManager.getFileSystemView(basePath).getLatestFileSlicesBeforeOrOn(partitionPath,
maxInstantTime)
- .map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());
+ String maxInstantTime, boolean includeFileSlicesInPendingCompaction) {
+ return
viewManager.getFileSystemView(basePath).getLatestFileSlicesBeforeOrOn(partitionPath,
maxInstantTime,
+
includeFileSlicesInPendingCompaction).map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());
}
public List<FileSliceDTO> getLatestUnCompactedFileSlices(String basePath,
String partitionPath) {