This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c9c7c1019f5 [HUDI-6952] Skip reading the uncommitted log files for log
reader (#9879)
c9c7c1019f5 is described below
commit c9c7c1019f5ca903ad3782f79370663f5ae26cb9
Author: Danny Chan <[email protected]>
AuthorDate: Sat Oct 21 04:21:14 2023 +0800
[HUDI-6952] Skip reading the uncommitted log files for log reader (#9879)
This is to avoid potential exceptions when the reader is processing an
uncommitted log file while the
cleaning or rollback service removes the log file.
---
.../hudi/client/TestCompactionAdminClient.java | 14 ++--
.../table/log/AbstractHoodieLogRecordReader.java | 24 ++-----
.../table/timeline/CompletionTimeQueryView.java | 19 +++--
.../table/timeline/HoodieDefaultTimeline.java | 57 +++++++++++----
.../table/view/AbstractTableFileSystemView.java | 84 +++++++++++++---------
.../table/view/TestHoodieTableFileSystemView.java | 23 +++---
6 files changed, 129 insertions(+), 92 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index 4177297a6ba..20677bf7c85 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client;
import org.apache.hudi.client.CompactionAdminClient.ValidationOpResult;
import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -208,7 +207,6 @@ public class TestCompactionAdminClient extends
HoodieClientTestBase {
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline());
Set<String> commitsWithDataFile = CollectionUtils.createSet("000", "004");
- Set<String> commitsWithLogAfterCompactionRequest =
CollectionUtils.createSet("000", "002");
// Expect each file-slice whose base-commit is same as compaction commit
to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().compareTo(compactionInstant) <=
0)
@@ -218,16 +216,12 @@ public class TestCompactionAdminClient extends
HoodieClientTestBase {
} else {
assertFalse(fs.getBaseFile().isPresent(), "No Data file should be
present");
}
- if
(commitsWithLogAfterCompactionRequest.contains(fs.getBaseInstantTime())) {
- assertEquals(4, fs.getLogFiles().count(), "Has Log Files");
- } else {
- assertEquals(2, fs.getLogFiles().count(), "Has Log Files");
- }
+ assertEquals(2, fs.getLogFiles().count(), "Has Log Files");
});
// Ensure same number of log-files before and after renaming per fileId
Map<String, Long> fileIdToCountsAfterRenaming =
-
newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(HoodieFileGroup::getAllFileSlices)
+
newFsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
ingestionInstant)
.filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
.map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
@@ -246,7 +240,7 @@ public class TestCompactionAdminClient extends
HoodieClientTestBase {
ensureValidCompactionPlan(compactionInstant);
// Check suggested rename operations
- metaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
+ metaClient.reloadActiveTimeline();
// Log files belonging to file-slices created because of compaction
request must be renamed
@@ -277,7 +271,7 @@ public class TestCompactionAdminClient extends
HoodieClientTestBase {
// Ensure same number of log-files before and after renaming per fileId
Map<String, Long> fileIdToCountsAfterRenaming =
-
newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(HoodieFileGroup::getAllFileSlices)
+
newFsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0],
ingestionInstant)
.filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId()))
.map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 3678efe7862..17c77d807c0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -235,13 +235,12 @@ public abstract class AbstractHoodieLogRecordReader {
totalLogBlocks = new AtomicLong(0);
totalLogRecords = new AtomicLong(0);
HoodieLogFormatReader logFormatReaderWrapper = null;
- HoodieTimeline commitsTimeline =
this.hoodieTableMetaClient.getCommitsTimeline();
- HoodieTimeline completedInstantsTimeline =
commitsTimeline.filterCompletedInstants();
- HoodieTimeline inflightInstantsTimeline =
commitsTimeline.filterInflights();
try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
- logFilePaths.stream().map(logFile -> new HoodieLogFile(new
CachingPath(logFile))).collect(Collectors.toList()),
+ logFilePaths.stream()
+ .map(filePath -> new HoodieLogFile(new CachingPath(filePath)))
+ .collect(Collectors.toList()),
readerSchema, true, reverseReader, bufferSize,
shouldLookupRecords(), recordKeyField, internalSchema);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
@@ -269,11 +268,6 @@ public abstract class AbstractHoodieLogRecordReader {
break;
}
if (logBlock.getBlockType() != CORRUPT_BLOCK &&
logBlock.getBlockType() != COMMAND_BLOCK) {
- if
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
- || inflightInstantsTimeline.containsInstant(instantTime)) {
- // hit an uncommitted block possibly from a failed write, move to
the next one and skip processing this one
- continue;
- }
if (instantRange.isPresent() &&
!instantRange.get().isInRange(instantTime)) {
// filter the log block by instant range
continue;
@@ -548,13 +542,12 @@ public abstract class AbstractHoodieLogRecordReader {
totalLogBlocks = new AtomicLong(0);
totalLogRecords = new AtomicLong(0);
HoodieLogFormatReader logFormatReaderWrapper = null;
- HoodieTimeline commitsTimeline =
this.hoodieTableMetaClient.getCommitsTimeline();
- HoodieTimeline completedInstantsTimeline =
commitsTimeline.filterCompletedInstants();
- HoodieTimeline inflightInstantsTimeline =
commitsTimeline.filterInflights();
try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
- logFilePaths.stream().map(logFile -> new HoodieLogFile(new
CachingPath(logFile))).collect(Collectors.toList()),
+ logFilePaths.stream()
+ .map(logFile -> new HoodieLogFile(new CachingPath(logFile)))
+ .collect(Collectors.toList()),
readerSchema, true, reverseReader, bufferSize,
shouldLookupRecords(), recordKeyField, internalSchema);
/**
@@ -621,11 +614,6 @@ public abstract class AbstractHoodieLogRecordReader {
break;
}
if (logBlock.getBlockType() != COMMAND_BLOCK) {
- if
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
- || inflightInstantsTimeline.containsInstant(instantTime)) {
- // hit an uncommitted block possibly from a failed write, move to
the next one and skip processing this one
- continue;
- }
if (instantRange.isPresent() &&
!instantRange.get().isInRange(instantTime)) {
// filter the log block by instant range
continue;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
index 104c40907a0..290f31ff344 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
@@ -34,6 +34,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.EQUALS;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
/**
* Query view for instant completion time.
@@ -57,9 +58,9 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
private final String startInstant;
/**
- * The first instant on the active timeline, used for query optimization.
+ * The first write instant on the active timeline, used for query
optimization.
*/
- private final String firstInstantOnActiveTimeline;
+ private final String firstNonSavepointCommit;
/**
* The constructor.
@@ -79,8 +80,9 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String
startInstant) {
this.metaClient = metaClient;
this.startToCompletionInstantTimeMap = new ConcurrentHashMap<>();
- this.startInstant = startInstant;
- this.firstInstantOnActiveTimeline =
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse("");
+ this.startInstant = minInstant(startInstant,
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
+ // Note: use getWriteTimeline() to keep sync with the fs view
visibleCommitsAndCompactionTimeline, see
AbstractTableFileSystemView.refreshTimeline.
+ this.firstNonSavepointCommit =
metaClient.getActiveTimeline().getWriteTimeline().getFirstNonSavepointCommit().map(HoodieInstant::getTimestamp).orElse("");
load();
}
@@ -88,7 +90,8 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
* Returns whether the instant is completed.
*/
public boolean isCompleted(String instantTime) {
- return getCompletionTime(instantTime).isPresent();
+ return this.startToCompletionInstantTimeMap.containsKey(instantTime)
+ || HoodieTimeline.compareTimestamps(instantTime, LESSER_THAN,
this.firstNonSavepointCommit);
}
/**
@@ -154,7 +157,7 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
if (completionTime != null) {
return Option.of(completionTime);
}
- if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN,
this.firstInstantOnActiveTimeline)) {
+ if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN,
this.startInstant)) {
// the instant is still pending
return Option.empty();
}
@@ -199,6 +202,10 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
this.startToCompletionInstantTimeMap.putIfAbsent(instantTime,
completionTime);
}
+ private static String minInstant(String instant1, String instant2) {
+ return compareTimestamps(instant1, LESSER_THAN, instant2) ? instant1 :
instant2;
+ }
+
@Override
public void close() throws Exception {
this.startToCompletionInstantTimeMap.clear();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 4b3386e5b3c..0ee3bd5bee4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -56,6 +56,10 @@ public class HoodieDefaultTimeline implements HoodieTimeline
{
protected transient Function<HoodieInstant, Option<byte[]>> details;
private List<HoodieInstant> instants;
+ // for efficient #contains queries.
+ private transient volatile Set<String> instantTimeSet;
+ // for efficient #isBeforeTimelineStarts check.
+ private transient volatile Option<HoodieInstant> firstNonSavepointCommit;
private String timelineHash;
public HoodieDefaultTimeline(Stream<HoodieInstant> instants,
Function<HoodieInstant, Option<byte[]>> details) {
@@ -428,7 +432,7 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
@Override
public boolean containsInstant(String ts) {
// Check for 0.10.0+ timestamps which have msec granularity
- if (getInstantsAsStream().anyMatch(s -> s.getTimestamp().equals(ts))) {
+ if (getOrCreateInstantSet().contains(ts)) {
return true;
}
@@ -480,20 +484,14 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
}
public Option<HoodieInstant> getFirstNonSavepointCommit() {
- Option<HoodieInstant> firstCommit = firstInstant();
- Set<String> savepointTimestamps = getInstantsAsStream()
- .filter(entry ->
entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION))
- .map(HoodieInstant::getTimestamp)
- .collect(Collectors.toSet());
- Option<HoodieInstant> firstNonSavepointCommit = firstCommit;
- if (!savepointTimestamps.isEmpty()) {
- // There are chances that there could be holes in the timeline due to
archival and savepoint interplay.
- // So, the first non-savepoint commit is considered as beginning of the
active timeline.
- firstNonSavepointCommit = Option.fromJavaOptional(getInstantsAsStream()
- .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp()))
- .findFirst());
+ if (this.firstNonSavepointCommit == null) {
+ synchronized (this) {
+ if (this.firstNonSavepointCommit == null) {
+ this.firstNonSavepointCommit =
findFirstNonSavepointCommit(this.instants);
+ }
+ }
}
- return firstNonSavepointCommit;
+ return this.firstNonSavepointCommit;
}
public Option<HoodieInstant> getLastClusterCommit() {
@@ -508,7 +506,7 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
}
}).findFirst());
}
-
+
@Override
public Option<byte[]> getInstantDetails(HoodieInstant instant) {
return details.apply(instant);
@@ -524,6 +522,35 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
return this.getClass().getName() + ": " +
getInstantsAsStream().map(Object::toString).collect(Collectors.joining(","));
}
+ private Set<String> getOrCreateInstantSet() {
+ if (this.instantTimeSet == null) {
+ synchronized (this) {
+ if (this.instantTimeSet == null) {
+ this.instantTimeSet =
this.instants.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+ }
+ }
+ }
+ return this.instantTimeSet;
+ }
+
+ /**
+ * Returns the first non savepoint commit on the timeline.
+ */
+ private static Option<HoodieInstant>
findFirstNonSavepointCommit(List<HoodieInstant> instants) {
+ Set<String> savepointTimestamps = instants.stream()
+ .filter(entry ->
entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION))
+ .map(HoodieInstant::getTimestamp)
+ .collect(Collectors.toSet());
+ if (!savepointTimestamps.isEmpty()) {
+ // There are chances that there could be holes in the timeline due to
archival and savepoint interplay.
+ // So, the first non-savepoint commit is considered as beginning of the
active timeline.
+ return Option.fromJavaOptional(instants.stream()
+ .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp()))
+ .findFirst());
+ }
+ return Option.fromJavaOptional(instants.stream().findFirst());
+ }
+
/**
* Merge this timeline with the given timeline.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 7e0d381bfb0..b3dc0fbce0a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -554,32 +554,22 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
/**
- * Returns true if the file-group is under pending-compaction and the
file-slice' baseInstant matches compaction
- * Instant.
- *
- * @param fileSlice File Slice
- */
- protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
- Option<Pair<String, CompactionOperation>> compactionWithInstantTime =
- getPendingCompactionOperationWithInstant(fileSlice.getFileGroupId());
- return (compactionWithInstantTime.isPresent())
- &&
fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.get().getKey());
- }
-
- /**
- * With async compaction, it is possible to see partial/complete base-files
due to inflight-compactions, Ignore those
- * base-files.
+ * Ignores the uncommitted base and log files.
*
* @param fileSlice File Slice
* @param includeEmptyFileSlice include empty file-slice
*/
- protected Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice
fileSlice, boolean includeEmptyFileSlice) {
- if (isFileSliceAfterPendingCompaction(fileSlice)) {
- LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
- // Base file is filtered out of the file-slice as the corresponding
compaction
- // instant not completed yet.
+ private Stream<FileSlice> filterUncommittedFiles(FileSlice fileSlice,
boolean includeEmptyFileSlice) {
+ Option<HoodieBaseFile> committedBaseFile =
fileSlice.getBaseFile().isPresent() &&
completionTimeQueryView.isCompleted(fileSlice.getBaseInstantTime()) ?
fileSlice.getBaseFile() : Option.empty();
+ List<HoodieLogFile> committedLogFiles =
fileSlice.getLogFiles().filter(logFile ->
completionTimeQueryView.isCompleted(logFile.getDeltaCommitTime())).collect(Collectors.toList());
+ if ((fileSlice.getBaseFile().isPresent() && !committedBaseFile.isPresent())
+ || committedLogFiles.size() != fileSlice.getLogFiles().count()) {
+ LOG.debug("File Slice (" + fileSlice + ") has uncommitted files.");
+ // A file is filtered out of the file-slice if the corresponding
+ // instant has not completed yet.
FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(),
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
- fileSlice.getLogFiles().forEach(transformed::addLogFile);
+ committedBaseFile.ifPresent(transformed::setBaseFile);
+ committedLogFiles.forEach(transformed::addLogFile);
if (transformed.isEmpty() && !includeEmptyFileSlice) {
return Stream.of();
}
@@ -588,6 +578,25 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return Stream.of(fileSlice);
}
+ /**
+ * Ignores the uncommitted log files.
+ *
+ * @param fileSlice File Slice
+ */
+ private FileSlice filterUncommittedLogs(FileSlice fileSlice) {
+ List<HoodieLogFile> committedLogFiles =
fileSlice.getLogFiles().filter(logFile ->
completionTimeQueryView.isCompleted(logFile.getDeltaCommitTime())).collect(Collectors.toList());
+ if (committedLogFiles.size() != fileSlice.getLogFiles().count()) {
+ LOG.debug("File Slice (" + fileSlice + ") has uncommitted log files.");
+ // A file is filtered out of the file-slice if the corresponding
+ // instant has not completed yet.
+ FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(),
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+ fileSlice.getBaseFile().ifPresent(transformed::setBaseFile);
+ committedLogFiles.forEach(transformed::addLogFile);
+ return transformed;
+ }
+ return fileSlice;
+ }
+
protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup
fileGroup) {
boolean hasBootstrapBaseFile = fileGroup.getAllFileSlices()
.anyMatch(fs ->
fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS));
@@ -808,7 +817,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
ensurePartitionLoadedCorrectly(partitionPath);
return fetchLatestFileSlices(partitionPath)
.filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
- .flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice,
true))
+ .flatMap(slice -> this.filterUncommittedFiles(slice, true))
.map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
@@ -831,7 +840,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
if (!fs.isPresent()) {
return Option.empty();
}
- return
Option.ofNullable(filterBaseFileAfterPendingCompaction(fs.get(),
true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null));
+ return Option.ofNullable(filterUncommittedFiles(fs.get(),
true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null));
}
} finally {
readLock.unlock();
@@ -873,16 +882,15 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
.filter(slice ->
!isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
.map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
if (includeFileSlicesInPendingCompaction) {
- return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice
-> this.filterBaseFileAfterPendingCompaction(slice, false)))
- .map(sliceStream ->
Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
+ return
allFileSliceStream.map(this::getLatestFileSliceFilteringUncommittedFiles)
+ .filter(Option::isPresent).map(Option::get)
.map(this::addBootstrapBaseFileIfPresent);
} else {
return allFileSliceStream
.map(sliceStream ->
- Option.fromJavaOptional(sliceStream
+ getLatestFileSliceFilteringUncommittedFiles(sliceStream
.filter(slice ->
!isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
- .filter(slice -> !slice.isEmpty())
- .findFirst()))
+ .filter(slice -> !slice.isEmpty())))
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
}
} finally {
@@ -890,6 +898,16 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
}
+ /**
+ * Looks for the latest file slice that is not empty after filtering out the
uncommitted files.
+ *
+ * <p>Note: Checks from the latest file slice first to improve the
efficiency. There is no need to check
+ * every file slice, the uncommitted files only exist in the latest file
slice basically.
+ */
+ private Option<FileSlice>
getLatestFileSliceFilteringUncommittedFiles(Stream<FileSlice> fileSlices) {
+ return Option.fromJavaOptional(fileSlices.flatMap(fileSlice ->
filterUncommittedFiles(fileSlice, false)).findFirst());
+ }
+
@Override
public final Map<String, Stream<FileSlice>>
getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
try {
@@ -900,8 +918,8 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
partitionPath -> fetchAllStoredFileGroups(partitionPath)
.filter(slice ->
!isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
.map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime))
- .map(sliceStream -> sliceStream.flatMap(slice ->
this.filterBaseFileAfterPendingCompaction(slice, false)))
- .map(sliceStream ->
Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
+ .map(this::getLatestFileSliceFilteringUncommittedFiles)
+ .filter(Option::isPresent).map(Option::get)
.map(this::addBootstrapBaseFileIfPresent)
));
} finally {
@@ -921,7 +939,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
Option<FileSlice> fileSlice =
fileGroup.getLatestFileSliceBeforeOrOn(maxInstantTime);
// if the file-group is under construction, pick the latest before
compaction instant time.
if (fileSlice.isPresent()) {
- fileSlice = Option.of(fetchMergedFileSlice(fileGroup,
fileSlice.get()));
+ fileSlice = Option.of(fetchMergedFileSlice(fileGroup,
filterUncommittedLogs(fileSlice.get())));
}
return fileSlice;
}).filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
@@ -951,7 +969,9 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return fetchAllStoredFileGroups(partition)
.filter(fg -> !isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(),
maxInstantTime))
.map(fileGroup -> fetchAllLogsMergedFileSlice(fileGroup,
maxInstantTime))
-
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
+ .filter(Option::isPresent).map(Option::get)
+ .map(this::filterUncommittedLogs)
+ .map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 5462a671f8c..ab8a1fd3aa2 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -445,10 +445,9 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
assertThat("Base Instant for file-group set correctly",
fileSlice.getBaseInstantTime(), is(instantTime1));
assertThat("File-Id must be set correctly", fileSlice.getFileId(),
is(fileId));
List<HoodieLogFile> logFiles =
fileSlice.getLogFiles().collect(Collectors.toList());
- assertEquals(3, logFiles.size(), "Correct number of log-files shows up in
file-slice");
- assertEquals(deltaFile3, logFiles.get(0).getFileName(), "Log File Order
check");
- assertEquals(deltaFile2, logFiles.get(1).getFileName(), "Log File Order
check");
- assertEquals(deltaFile1, logFiles.get(2).getFileName(), "Log File Order
check");
+ assertEquals(2, logFiles.size(), "Correct number of log-files shows up in
file-slice");
+ assertEquals(deltaFile2, logFiles.get(0).getFileName(), "Log File Order
check");
+ assertEquals(deltaFile1, logFiles.get(1).getFileName(), "Log File Order
check");
// schedules a compaction
String compactionInstantTime1 = metaClient.createNewInstantTime(); // 60
-> 80
@@ -473,8 +472,7 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
assertThat("Base Instant for file-group set correctly",
fileSlice.getBaseInstantTime(), is(compactionInstantTime1));
assertThat("File-Id must be set correctly", fileSlice.getFileId(),
is(fileId));
logFiles = fileSlice.getLogFiles().collect(Collectors.toList());
- assertEquals(1, logFiles.size(), "Correct number of log-files shows up in
file-slice");
- assertEquals(deltaFile3, logFiles.get(0).getFileName(), "Log File Order
check");
+ assertEquals(0, logFiles.size(), "Correct number of log-files shows up in
file-slice");
// now finished the compaction
saveAsComplete(commitTimeline, compactionInstant, Option.empty());
@@ -1037,6 +1035,9 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
testStreamLatestVersionInPartition(isLatestFileSliceOnly,
fullPartitionPath, commitTime1, commitTime2, commitTime3,
commitTime4, fileId1, fileId2, fileId3, fileId4);
+ // Note: the separate archiving of clean and rollback actions is removed
since 1.0.0,
+ // now all the instants archive continuously.
+
// Now create a scenario where archiving deleted commits (1,2, and 3) but
retained cleaner clean1. Now clean1 is
// the lowest commit time. Scenario for HUDI-162 - Here clean is the
earliest action in active timeline
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").delete();
@@ -1078,7 +1079,7 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
filenames = new HashSet<>();
List<HoodieLogFile> logFilesList =
rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4, true)
- .map(FileSlice::getLogFiles).flatMap(logFileList ->
logFileList).collect(Collectors.toList());
+ .flatMap(FileSlice::getLogFiles).collect(Collectors.toList());
assertEquals(4, logFilesList.size());
for (HoodieLogFile logFile : logFilesList) {
filenames.add(logFile.getFileName());
@@ -1422,8 +1423,8 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
String fullPartitionPath3 = basePath + "/" + partitionPath3 + "/";
new File(fullPartitionPath3).mkdirs();
String instantTime1 = "1";
- String deltaInstantTime1 = "2";
- String deltaInstantTime2 = "3";
+ String deltaInstantTime1 = "3";
+ String deltaInstantTime2 = "4";
String fileId = UUID.randomUUID().toString();
String dataFileName = FSUtils.makeBaseFileName(instantTime1,
TEST_WRITE_TOKEN, fileId);
@@ -1489,8 +1490,8 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(requested);
// Fake delta-ingestion after compaction-requested
- String deltaInstantTime4 = "4";
- String deltaInstantTime5 = "6";
+ String deltaInstantTime4 = "5";
+ String deltaInstantTime5 = "7";
String fileName3 =
FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
deltaInstantTime4, 0, TEST_WRITE_TOKEN);
String fileName4 =