nsivabalan commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264603260
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -1246,6 +1304,19 @@ Stream<FileSlice> fetchAllFileSlices(String
partitionPath) {
.flatMap(HoodieFileGroup::getAllFileSlices);
}
+ /**
+ * Default implementation for fetching all file-slices for a partition-path.
+ *
+ * @param partitionPath Partition path
+ * @return file-slice stream
+ */
+ Stream<FileSlice> fetchAllFileSlices(String partitionPath, boolean
includePending) {
Review Comment:
do we need access specifier here? private
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java:
##########
@@ -137,7 +137,7 @@ public void close() {
*/
private static RocksDbBasedFileSystemView
createRocksDBBasedFileSystemView(SerializableConfiguration conf,
FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient) {
- HoodieTimeline timeline =
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
+ HoodieTimeline timeline = metaClient.getActiveTimeline();
Review Comment:
ok
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -129,9 +131,15 @@ public HoodieTimeline
filterCompletedOrMajorOrMinorCompactionInstants() {
}
@Override
- public HoodieDefaultTimeline filterCompletedInstantsOrRewriteTimeline() {
- Set<String> validActions = CollectionUtils.createSet(COMPACTION_ACTION,
LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
- return new HoodieDefaultTimeline(getInstantsAsStream().filter(s ->
s.isCompleted() || validActions.contains(s.getAction())), details);
+ public HoodieTimeline filterCompletedWriteAndCompactionInstants() {
+ return getWriteTimeline().filterCompletedAndCompactionInstants();
+ }
+
+ @Override
+ public HoodieDefaultTimeline
filterCompletedAndRewriteInstants(HoodieTableMetaClient metaClient) {
Review Comment:
filterCompletedWriteAndRewriteInstants
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -165,12 +165,18 @@ public interface HoodieTimeline extends Serializable {
HoodieTimeline filterCompletedOrMajorOrMinorCompactionInstants();
+ /**
+ * This is the filtered write timeline that contains only completed and
pending compaction instants.
+ * This is the timeline that is used in constructing file groups.
+ */
+ HoodieTimeline filterCompletedWriteAndCompactionInstants();
+
/**
* Timeline to just include completed commits or all rewrites like
compaction, logcompaction and replace actions.
*
* @return
*/
- HoodieTimeline filterCompletedInstantsOrRewriteTimeline();
+ HoodieTimeline filterCompletedAndRewriteInstants(HoodieTableMetaClient
metaClient);
Review Comment:
whats the expected behavior of this api.
completed commits and all compaction and all replace actions
OR
only completed (writes, compaction, replace commits)
?
can you fix the naming and java docs accordingly. its kind of mis-leading
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -1298,6 +1369,14 @@ Stream<FileSlice> fetchLatestFileSlices(String
partitionPath) {
.map(Option::get);
}
+ /**
+ * Default implementation for fetching latest file-slices for a partition
path.
+ */
+ Stream<FileSlice> fetchLatestFileSlices(String partitionPath, boolean
includePending) {
Review Comment:
access specifier?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -144,44 +172,54 @@ private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingCompactionTran
*/
public static class TimelineDiffResult {
- private final List<HoodieInstant> newlySeenInstants;
+ private final List<HoodieInstant> newlySeenCompletedAndRewriteInstants;
private final List<HoodieInstant> finishedCompactionInstants;
Review Comment:
so we are not not interested in removed Compaction instants is it? curious
to know why this is diff compared to log compactions.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -72,27 +82,75 @@ public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline
&& instantPair.getValue().isCompleted())
.map(Pair::getKey).collect(Collectors.toList());
- newTimeline.getInstantsAsStream().filter(instant ->
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
+ // Collect new instants that are not present in
oldCompletedAndRewriteTimeline
+ // but are present in new active timeline.
+ List<HoodieInstant> newCompletedAndRewriteInstants = new ArrayList<>();
+ newCompletedAndRewriteTimeline.getInstantsAsStream()
+ .filter(instant -> !oldTimelineInstants.contains(instant))
+ .forEach(newCompletedAndRewriteInstants::add);
- List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
- List<HoodieInstant> finishedOrRemovedLogCompactionInstants =
logCompactionInstants.stream()
+ // Check for log compaction commits completed or removed.
+ List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
+ getPendingLogCompactionTransitions(oldTimeline,
newCompletedAndRewriteTimeline);
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
.filter(instantPair -> !instantPair.getKey().isCompleted()
&& (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
- .map(Pair::getKey).collect(Collectors.toList());
- return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants, true);
+ .map(instantPair -> Pair.of(instantPair.getKey(),
instantPair.getValue() != null))
+ .collect(Collectors.toList());
+
+ // Check if replacecommits are completed or removed.
+ List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants =
+ getPendingReplaceCommitTransitions(oldTimeline,
newCompletedAndRewriteTimeline);
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+ .filter(instantPair -> !instantPair.getKey().isCompleted()
+ && (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
+ .map(instantPair -> Pair.of(instantPair.getKey(),
instantPair.getValue() != null))
+ .collect(Collectors.toList());
+
+ // New instants will contains instants that are newly completed commits
or newly created pending rewrite commits
+ // (i.e. compaction, log-compaction, clustering)
+ // Finished or removed rewrite commits are handled independently.
+ return new TimelineDiffResult(newCompletedAndRewriteInstants,
finishedCompactionInstants, finishedOrRemovedLogCompactionInstants,
+ finishedOrRemovedReplaceCommitInstants, true);
} else {
// One or more timelines is empty
LOG.warn("One or more timelines is empty");
return TimelineDiffResult.UNSAFE_SYNC_RESULT;
}
}
+ /**
+ * Get pending replacecommit transitions.
+ */
+ private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingReplaceCommitTransitions(
+ HoodieTimeline oldTimeline, HoodieTimeline
newCompletedAndRewriteTimeline) {
+ Set<HoodieInstant> newTimelineInstants =
newCompletedAndRewriteTimeline.getInstantsAsStream().collect(Collectors.toSet());
+
+ return
oldTimeline.filterPendingReplaceTimeline().getInstantsAsStream().map(instant ->
{
+ if (newTimelineInstants.contains(instant)) {
+ return Pair.of(instant, instant);
+ } else {
+ HoodieInstant completedReplaceCommit =
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.REPLACE_COMMIT_ACTION, instant.getTimestamp());
+ if (newTimelineInstants.contains(completedReplaceCommit)) {
+ return Pair.of(instant, completedReplaceCommit);
+ }
+ HoodieInstant inflightRepaceCommit =
+ new HoodieInstant(State.INFLIGHT,
HoodieTimeline.REPLACE_COMMIT_ACTION, instant.getTimestamp());
+ if (newTimelineInstants.contains(inflightRepaceCommit)) {
+ return Pair.of(instant, inflightRepaceCommit);
+ }
+ return Pair.<HoodieInstant, HoodieInstant>of(instant, null);
+ }
+ }).collect(Collectors.toList());
+ }
+
/**
* Getting pending log compaction transitions.
*/
- private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingLogCompactionTransitions(HoodieTimeline oldTimeline,
-
HoodieTimeline newTimeline) {
- Set<HoodieInstant> newTimelineInstants =
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
+ private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingLogCompactionTransitions(
+ HoodieTimeline oldTimeline, HoodieTimeline
newCompletedAndRewriteTimeline) {
Review Comment:
what does newCompletedAndRewriteTimeline refer to?
completed writes and compaction commits and all rewrite instants (replace
commit)?
or does it include all compaction instants too. then, I feel we shd fix the
naming. it is mis-leading
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -90,9 +91,11 @@ public abstract class AbstractTableFileSystemView implements
SyncableFileSystemV
protected HoodieTableMetaClient metaClient;
- // This is the commits timeline that will be visible for all views extending
this view
- // This is nothing but the write timeline, which contains both ingestion and
compaction(major and minor) writers.
- private HoodieTimeline visibleCommitsAndCompactionTimeline;
+ // Timeline determines whether a file slice is committed or not. This
timeline object is attached
Review Comment:
can we fix the java docs.
```
// This is the commits timeline that will be visible for all views extending
this view. This timeline contains completed write instants, all compaction
instants(including pending).
This timeline object is attached to each HoodieFileGroup. Using this
timeline object, HoodieFileGroup determines whether a fileSlice is committed or
not.
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -170,7 +170,7 @@ public interface HoodieTimeline extends Serializable {
*
* @return
*/
- HoodieTimeline filterCompletedInstantsOrRewriteTimeline();
+ HoodieTimeline filterCompletedAndRewriteInstants();
Review Comment:
generally, we have used rewrite to denote only replace commits. So, are we
changing the definition ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -165,12 +165,18 @@ public interface HoodieTimeline extends Serializable {
HoodieTimeline filterCompletedOrMajorOrMinorCompactionInstants();
+ /**
+ * This is the filtered write timeline that contains only completed and
pending compaction instants.
+ * This is the timeline that is used in constructing file groups.
+ */
+ HoodieTimeline filterCompletedWriteAndCompactionInstants();
+
/**
* Timeline to just include completed commits or all rewrites like
compaction, logcompaction and replace actions.
*
* @return
*/
- HoodieTimeline filterCompletedInstantsOrRewriteTimeline();
+ HoodieTimeline filterCompletedAndRewriteInstants(HoodieTableMetaClient
metaClient);
Review Comment:
are you introducing a new terminology called "rewrite" with this patch. or
is there precedence already?
my understanding is, rewrite refers to only replace commits. but some apis
in this patch, we are using rewrite to refer to compaction as well? can we
ensure its uniform across the board.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -130,36 +141,50 @@ private void runIncrementalSync(HoodieTimeline timeline,
TimelineDiffResult diff
});
// Now remove pending log compaction instants which were completed or
removed
-
diffResult.getFinishedOrRemovedLogCompactionInstants().stream().forEach(instant
-> {
+
diffResult.getFinishedOrRemovedLogCompactionInstants().stream().forEach(instantPair
-> {
+ try {
+ removePendingLogCompactionInstant(instantPair.getKey(),
instantPair.getValue());
+ } catch (IOException e) {
+ throw new HoodieException(e);
+ }
+ });
+
+ // Now remove pending replace instants which were completed or removed
+
diffResult.getFinishedOrRemovedReplaceInstants().stream().forEach(instantPair
-> {
try {
- removePendingLogCompactionInstant(instant);
+ removePendingFileGroupsInPendingClustering(instantPair.getKey(),
instantPair.getValue());
} catch (IOException e) {
throw new HoodieException(e);
}
});
- // Add new completed instants found in the latest timeline, this also
contains inflight instants.
- diffResult.getNewlySeenInstants().stream()
- .filter(instant -> instant.isCompleted()
- || instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)
- ||
instant.getAction().equals(HoodieTimeline.LOG_COMPACTION_ACTION))
+ HoodieTimeline newFileSystemViewTimeline =
+ newActiveTimeline.getFileSystemViewTimeline();
+ // Add new completed instants found in the latest timeline,
+ // this also contains inflight instants from rewrite timeline.
+ diffResult.getNewlySeenCompletedAndRewriteInstants()
.forEach(instant -> {
try {
if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)
||
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
- addCommitInstant(timeline, instant);
+ addCommitInstant(newFileSystemViewTimeline, instant);
Review Comment:
wouldn't newActiveTimeline a super set of newFileSystemViewTimeline? can we
use newActiveTimeline for all from 170L to 187 ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant
instant) throws IOExce
}
/**
- * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit,
- * so you no longer need to track them as pending.
+ * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit or the log compaction
+ * operation is failed, so it is no longer need to be tracked as pending.
*
* @param instant Log Compaction Instant to be removed
+ * @param isSuccessful true if log compaction operation is successful, false
if the operation is failed and rollbacked.
*/
- private void removePendingLogCompactionInstant(HoodieInstant instant) throws
IOException {
- LOG.info("Removing completed log compaction instant (" + instant + ")");
- HoodieCompactionPlan plan =
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
-
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
plan)
- .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
-
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+ private void removePendingLogCompactionInstant(HoodieInstant instant,
boolean isSuccessful) throws IOException {
+ if (isSuccessful) {
+ LOG.info("Removing completed log compaction instant (" + instant + ")");
+ HoodieCompactionPlan plan =
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
+
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
plan)
+ .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
+
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+ } else {
+ LOG.info("Removing failed log compaction instant (" + instant + ")");
+ removePendingLogCompactionOperations(instant.getTimestamp());
Review Comment:
I see why we are not interested in completed compaction instants now.
but wouldn't the logic applicable to clustering commits as well then ? lets
sync up f2f.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]