This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ae3c02f HUDI-162 : File System view must be built with correct
timeline actions
ae3c02f is described below
commit ae3c02fb3fff47da8f20c4f9c8012a257a3df792
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Mon Jul 1 18:19:12 2019 -0700
HUDI-162 : File System view must be built with correct timeline actions
---
.../uber/hoodie/cli/commands/RepairsCommand.java | 2 +-
.../com/uber/hoodie/io/HoodieAppendHandle.java | 2 +-
.../uber/hoodie/table/HoodieCopyOnWriteTable.java | 4 +--
.../uber/hoodie/table/HoodieMergeOnReadTable.java | 2 +-
.../hoodie/common/model/HoodieCommitMetadata.java | 4 +--
.../uber/hoodie/common/table/HoodieTimeline.java | 6 ++++
.../table/timeline/HoodieDefaultTimeline.java | 8 +++++
.../table/view/AbstractTableFileSystemView.java | 38 ++++++++++++++++------
.../table/view/HoodieTableFileSystemView.java | 1 -
.../IncrementalTimelineSyncFileSystemView.java | 16 ++++++++-
.../table/view/RocksDbBasedFileSystemView.java | 3 +-
.../table/view/HoodieTableFileSystemViewTest.java | 32 ++++++++++++++----
12 files changed, 91 insertions(+), 27 deletions(-)
diff --git
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
index 839cd5c..8967f2b 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
@@ -86,7 +86,7 @@ public class RepairsCommand implements CommandMarker {
int ind = 0;
for (String partition : partitionPaths) {
- Path partitionPath = new Path(basePath, partition);
+ Path partitionPath = FSUtils.getPartitionPath(basePath, partition);
String[] row = new String[3];
row[0] = partition;
row[1] = "Yes";
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
index cc98f56..99d3c63 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
@@ -274,7 +274,7 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload> extends HoodieWri
Optional<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();
return HoodieLogFormat.newWriterBuilder()
- .onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
+
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(
latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
index 6b9f4f6..e1c4b99 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
@@ -315,7 +315,7 @@ public class HoodieCopyOnWriteTable<T extends
HoodieRecordPayload> extends Hoodi
throws IOException {
logger.info("Cleaning path " + partitionPath);
FileSystem fs = getMetaClient().getFs();
- FileStatus[] toBeDeleted = fs.listStatus(new Path(config.getBasePath(),
partitionPath), filter);
+ FileStatus[] toBeDeleted =
fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
filter);
for (FileStatus file : toBeDeleted) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
@@ -340,7 +340,7 @@ public class HoodieCopyOnWriteTable<T extends
HoodieRecordPayload> extends Hoodi
}
return false;
};
- FileStatus[] toBeDeleted = fs.listStatus(new Path(config.getBasePath(),
partitionPath), filter);
+ FileStatus[] toBeDeleted =
fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
filter);
for (FileStatus file : toBeDeleted) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
index 589b709..af45052 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
@@ -507,7 +507,7 @@ public class HoodieMergeOnReadTable<T extends
HoodieRecordPayload> extends
String baseCommitTime =
fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
try {
writer = HoodieLogFormat.newWriterBuilder().onParentPath(
- new Path(this.getMetaClient().getBasePath(), partitionPath))
+ FSUtils.getPartitionPath(this.getMetaClient().getBasePath(),
partitionPath))
.withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
.withFs(this.metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java
index 42ad091..b76ba46 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.uber.hoodie.common.util.FSUtils;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
@@ -30,7 +31,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -107,7 +107,7 @@ public class HoodieCommitMetadata implements Serializable {
HashMap<String, String> fullPaths = new HashMap<>();
for (Map.Entry<String, String> entry :
getFileIdAndRelativePaths().entrySet()) {
String fullPath =
- (entry.getValue() != null) ? (new Path(basePath,
entry.getValue())).toString() : null;
+ (entry.getValue() != null) ? (FSUtils.getPartitionPath(basePath,
entry.getValue())).toString() : null;
fullPaths.put(entry.getKey(), fullPath);
}
return fullPaths;
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java
index 24cab8d..642c08d 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java
@@ -106,6 +106,12 @@ public interface HoodieTimeline extends Serializable {
HoodieTimeline filterCompletedAndCompactionInstants();
/**
+ * Timeline to just include commits (commit/deltacommit) and compaction
actions
+ * @return
+ */
+ HoodieTimeline getCommitsAndCompactionTimeline();
+
+ /**
* Filter this timeline to just include requested and inflight compaction
instants
* @return
*/
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java
index d77b9fe..ed80c3c 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java
@@ -18,6 +18,7 @@
package com.uber.hoodie.common.table.timeline;
+import com.google.common.collect.Sets;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.util.StringUtils;
import com.uber.hoodie.exception.HoodieException;
@@ -25,6 +26,7 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -103,6 +105,12 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
}
@Override
+ public HoodieTimeline getCommitsAndCompactionTimeline() {
+ Set<String> validActions = Sets.newHashSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, COMPACTION_ACTION);
+ return new HoodieDefaultTimeline(instants.stream().filter(s ->
validActions.contains(s.getAction())), details);
+ }
+
+ @Override
public HoodieTimeline filterPendingCompactionTimeline() {
return new HoodieDefaultTimeline(
instants.stream().filter(s ->
s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)),
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
index 8b05b69..62410ac 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
@@ -71,8 +71,8 @@ public abstract class AbstractTableFileSystemView implements
SyncableFileSystemV
protected HoodieTableMetaClient metaClient;
- // This is the commits that will be visible for all views extending this view
- protected HoodieTimeline visibleActiveTimeline;
+ // This is the commits timeline that will be visible for all views extending
this view
+ private HoodieTimeline visibleCommitsAndCompactionTimeline;
// Used to concurrently load and populate partition views
private ConcurrentHashMap<String, Boolean> addedPartitions = new
ConcurrentHashMap<>(4096);
@@ -92,7 +92,8 @@ public abstract class AbstractTableFileSystemView implements
SyncableFileSystemV
*/
protected void init(HoodieTableMetaClient metaClient, HoodieTimeline
visibleActiveTimeline) {
this.metaClient = metaClient;
- this.visibleActiveTimeline = visibleActiveTimeline;
+ refreshTimeline(visibleActiveTimeline);
+
// Load Pending Compaction Operations
resetPendingCompactionOperations(
CompactionUtils.getAllPendingCompactionOperations(metaClient).values()
@@ -101,11 +102,19 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
/**
+ * Refresh commits timeline
+ * @param visibleActiveTimeline Visible Active Timeline
+ */
+ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
+ this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getCommitsAndCompactionTimeline();
+ }
+
+ /**
* Adds the provided statuses into the file system view, and also caches it
inside this object.
*/
protected List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
HoodieTimer timer = new HoodieTimer().startTimer();
- List<HoodieFileGroup> fileGroups = buildFileGroups(statuses,
visibleActiveTimeline, true);
+ List<HoodieFileGroup> fileGroups = buildFileGroups(statuses,
visibleCommitsAndCompactionTimeline, true);
long fgBuildTimeTakenMs = timer.endTimer();
timer.startTimer();
// Group by partition for efficient updates for both InMemory and
DiskBased stuctures.
@@ -133,7 +142,6 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieDataFile>
dataFileStream,
Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean
addPendingCompactionFileSlice) {
-
Map<Pair<String, String>, List<HoodieDataFile>> dataFiles = dataFileStream
.collect(Collectors.groupingBy((dataFile) -> {
String partitionPathStr =
getPartitionPathFromFilePath(dataFile.getPath());
@@ -187,7 +195,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
resetViewState();
// Initialize with new Hoodie timeline.
- init(metaClient, visibleActiveTimeline);
+ init(metaClient, getTimeline());
} finally {
writeLock.unlock();
}
@@ -288,6 +296,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
Option<Pair<String, CompactionOperation>> compactionWithInstantTime =
getPendingCompactionOperationWithInstant(fileSlice.getFileGroupId());
+ log.info("Pending Compaction instant for (" + fileSlice + ") is :" +
compactionWithInstantTime);
return (compactionWithInstantTime.isPresent())
&&
fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.get().getKey());
}
@@ -300,6 +309,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
*/
protected FileSlice filterDataFileAfterPendingCompaction(FileSlice
fileSlice) {
if (isFileSliceAfterPendingCompaction(fileSlice)) {
+ log.info("File Slice (" + fileSlice + ") is in pending compaction");
// Data file is filtered out of the file-slice as the corresponding
compaction
// instant not completed yet.
FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(),
@@ -417,7 +427,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
return fetchAllDataFiles(partitionPath)
- .filter(df ->
visibleActiveTimeline.containsOrBeforeTimelineStarts(df.getCommitTime()))
+ .filter(df ->
visibleCommitsAndCompactionTimeline.containsOrBeforeTimelineStarts(df.getCommitTime()))
.filter(df -> !isDataFileDueToPendingCompaction(df));
} finally {
readLock.unlock();
@@ -794,12 +804,12 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
@Override
public Option<HoodieInstant> getLastInstant() {
- return Option.fromJavaOptional(visibleActiveTimeline.lastInstant());
+ return Option.fromJavaOptional(getTimeline().lastInstant());
}
@Override
public HoodieTimeline getTimeline() {
- return visibleActiveTimeline;
+ return visibleCommitsAndCompactionTimeline;
}
@Override
@@ -822,10 +832,18 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
* @param newTimeline New Hoodie Timeline
*/
protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline
newTimeline) {
- visibleActiveTimeline = newTimeline;
+ refreshTimeline(newTimeline);
addedPartitions.clear();
resetViewState();
// Initialize with new Hoodie timeline.
init(metaClient, newTimeline);
}
+
+ /**
+ * Return Only Commits and Compaction timeline for building file-groups
+ * @return
+ */
+ public HoodieTimeline getVisibleCommitsAndCompactionTimeline() {
+ return visibleCommitsAndCompactionTimeline;
+ }
}
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java
index 2e52b9f..741d7ee 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java
@@ -198,7 +198,6 @@ public class HoodieTableFileSystemView extends
IncrementalTimelineSyncFileSystem
partitionToFileGroupsMap.put(partitionPath, newList);
}
-
@Override
public Stream<HoodieFileGroup> fetchAllStoredFileGroups() {
return partitionToFileGroupsMap.values().stream().flatMap(fg -> {
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java
index 76e7a53..c087de0 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java
@@ -56,11 +56,20 @@ public abstract class IncrementalTimelineSyncFileSystemView
extends AbstractTabl
// Allows incremental Timeline syncing
private final boolean incrementalTimelineSyncEnabled;
+ // This is the visible active timeline used only for incremental view syncing
+ private HoodieTimeline visibleActiveTimeline;
+
protected IncrementalTimelineSyncFileSystemView(boolean
enableIncrementalTimelineSync) {
this.incrementalTimelineSyncEnabled = enableIncrementalTimelineSync;
}
@Override
+ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
+ this.visibleActiveTimeline = visibleActiveTimeline;
+ super.refreshTimeline(visibleActiveTimeline);
+ }
+
+ @Override
protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline
newTimeline) {
try {
if (incrementalTimelineSyncEnabled) {
@@ -70,7 +79,7 @@ public abstract class IncrementalTimelineSyncFileSystemView
extends AbstractTabl
runIncrementalSync(newTimeline, diffResult);
log.info("Finished incremental sync");
// Reset timeline to latest
- visibleActiveTimeline = newTimeline;
+ refreshTimeline(newTimeline);
return;
}
}
@@ -337,4 +346,9 @@ public abstract class IncrementalTimelineSyncFileSystemView
extends AbstractTabl
buildFileGroups(viewDataFiles.values().stream(),
viewLogFiles.values().stream(), timeline, true);
storePartitionView(partition, fgs);
}
+
+ @Override
+ public HoodieTimeline getTimeline() {
+ return visibleActiveTimeline;
+ }
}
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
index 9edbb94..ab598e8 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
@@ -319,7 +319,8 @@ public class RocksDbBasedFileSystemView extends
IncrementalTimelineSyncFileSyste
return sliceStream.map(s -> Pair.of(Pair.of(s.getPartitionPath(),
s.getFileId()), s))
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream().map(slicePair
-> {
HoodieFileGroup fg =
- new HoodieFileGroup(slicePair.getKey().getKey(),
slicePair.getKey().getValue(), visibleActiveTimeline);
+ new HoodieFileGroup(slicePair.getKey().getKey(),
slicePair.getKey().getValue(),
+ getVisibleCommitsAndCompactionTimeline());
slicePair.getValue().forEach(e -> fg.addFileSlice(e.getValue()));
return fg;
});
diff --git
a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
index 4cae7a6..7a21a3d 100644
---
a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
+++
b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
@@ -103,8 +103,7 @@ public class HoodieTableFileSystemViewTest {
fsView.close();
fsView = null;
}
- fsView = getFileSystemView(
-
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants());
+ fsView =
getFileSystemView(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
roView = (TableFileSystemView.ReadOptimizedView) fsView;
rtView = (TableFileSystemView.RealtimeView) fsView;
}
@@ -615,10 +614,12 @@ public class HoodieTableFileSystemViewTest {
// Put some files in the partition
String fullPartitionPath = basePath + "/2016/05/01/";
new File(fullPartitionPath).mkdirs();
- String commitTime1 = "1";
- String commitTime2 = "2";
- String commitTime3 = "3";
- String commitTime4 = "4";
+ String cleanTime1 = "1";
+ String commitTime1 = "2";
+ String commitTime2 = "3";
+ String commitTime3 = "4";
+ String commitTime4 = "5";
+
String fileId1 = UUID.randomUUID().toString();
String fileId2 = UUID.randomUUID().toString();
String fileId3 = UUID.randomUUID().toString();
@@ -640,11 +641,29 @@ public class HoodieTableFileSystemViewTest {
new File(fullPartitionPath + FSUtils.makeLogFileName(fileId4,
HoodieLogFile.DELTA_EXTENSION,
commitTime4, 0, TEST_WRITE_TOKEN)).createNewFile();
+ // Create commit/clean files
+ new File(basePath + "/.hoodie/" + cleanTime1 + ".clean").createNewFile();
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime3 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime4 + ".commit").createNewFile();
+ testStreamLatestVersionInPartition(isLatestFileSliceOnly,
fullPartitionPath, commitTime1, commitTime2, commitTime3,
+ commitTime4, fileId1, fileId2, fileId3, fileId4);
+
+ // Now create a scenario where archiving deleted commits (1,2, and 3) but
retained cleaner clean1. Now clean1 is
+ // the lowest commit time. Scenario for HUDI-162 - Here clean is the
earliest action in active timeline
+ new File(basePath + "/.hoodie/" + commitTime1 + ".commit").delete();
+ new File(basePath + "/.hoodie/" + commitTime2 + ".commit").delete();
+ new File(basePath + "/.hoodie/" + commitTime3 + ".commit").delete();
+ testStreamLatestVersionInPartition(isLatestFileSliceOnly,
fullPartitionPath, commitTime1, commitTime2, commitTime3,
+ commitTime4, fileId1, fileId2, fileId3, fileId4);
+ }
+
+ private void testStreamLatestVersionInPartition(boolean
isLatestFileSliceOnly, String fullPartitionPath,
+ String commitTime1, String commitTime2, String commitTime3, String
commitTime4, String fileId1, String fileId2,
+ String fileId3, String fileId4) throws IOException {
+
// Now we list the entire partition
FileStatus[] statuses = metaClient.getFs().listStatus(new
Path(fullPartitionPath));
assertEquals(11, statuses.length);
@@ -711,7 +730,6 @@ public class HoodieTableFileSystemViewTest {
assertEquals(logFilesList.size(), 1);
assertTrue(logFilesList.get(0).getFileName()
.equals(FSUtils.makeLogFileName(fileId2,
HoodieLogFile.DELTA_EXTENSION, commitTime3, 0, TEST_WRITE_TOKEN)));
-
}
@Test