nsivabalan commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264287885
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -170,7 +170,7 @@ public interface HoodieTimeline extends Serializable {
*
* @return
*/
- HoodieTimeline filterCompletedInstantsOrRewriteTimeline();
+ HoodieTimeline filterCompletedAndRewriteInstants();
Review Comment:
we should name it as "filterCompletedRewriteInstants"
rewrite refers to commit, delta commits and replace commits.
completed refers to state.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java:
##########
@@ -53,6 +53,7 @@ public static Comparator<String>
getReverseCommitTimeComparator() {
/**
* Timeline, based on which all getter work.
+ * This should be a write timeline that contains either completed instants
or pending compaction instants.
Review Comment:
can we rename the variable then.
completedWriteAndCompactionTimeline
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -146,17 +189,22 @@ public static class TimelineDiffResult {
private final List<HoodieInstant> newlySeenInstants;
private final List<HoodieInstant> finishedCompactionInstants;
- private final List<HoodieInstant> finishedOrRemovedLogCompactionInstants;
+ // Completed instants will have true as the value where as instants
removed due to rollback will have false as value.
+ private final List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants;
+ // Completed instants will have true as the value where as instants
removed due to rollback will have false as value.
+ private final List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedReplaceCommitInstants;
private final boolean canSyncIncrementally;
public static final TimelineDiffResult UNSAFE_SYNC_RESULT =
- new TimelineDiffResult(null, null, null, false);
+ new TimelineDiffResult(null, null, null, null, false);
public TimelineDiffResult(List<HoodieInstant> newlySeenInstants,
List<HoodieInstant> finishedCompactionInstants,
- List<HoodieInstant>
finishedOrRemovedLogCompactionInstants, boolean canSyncIncrementally) {
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants,
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedReplaceCommitInstants, boolean canSyncIncrementally) {
this.newlySeenInstants = newlySeenInstants;
this.finishedCompactionInstants = finishedCompactionInstants;
this.finishedOrRemovedLogCompactionInstants =
finishedOrRemovedLogCompactionInstants;
+ this.finishedOrRemovedReplaceCommitInstants =
finishedOrRemovedReplaceCommitInstants;
Review Comment:
if your ans to my previous comment is, only clustering, should we rename all
these variables accordingly. it might confuse down the line.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline
newTimeline.getInstantsAsStream().filter(instant ->
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
+ // Check for log compaction commits completed or removed.
List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
- List<HoodieInstant> finishedOrRemovedLogCompactionInstants =
logCompactionInstants.stream()
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
.filter(instantPair -> !instantPair.getKey().isCompleted()
&& (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
- .map(Pair::getKey).collect(Collectors.toList());
- return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants, true);
+ .map(instantPair -> (instantPair.getValue() == null)
+ ? Pair.of(instantPair.getKey(), false) :
Pair.of(instantPair.getKey(), true))
+ .collect(Collectors.toList());
+
+ // Check for replace commits completed or removed.
+ List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants =
getPendingReplaceCommitTransitions(oldTimeline, newTimeline);
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+ .filter(instantPair -> !instantPair.getKey().isCompleted()
+ && (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
+ .map(instantPair -> (instantPair.getValue() == null)
+ ? Pair.of(instantPair.getKey(), false) :
Pair.of(instantPair.getKey(), true))
+ .collect(Collectors.toList());
+
+ // New instants will contains instants that are newly completed commits
or newly created pending rewrite commits
+ // (i.e. compaction, logcompaciton, replacecommit)
+ // Finished or removed rewrite commits are handled independently.
+ return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants,
+ finishedOrRemovedReplaceCommitInstants, true);
} else {
// One or more timelines is empty
LOG.warn("One or more timelines is empty");
return TimelineDiffResult.UNSAFE_SYNC_RESULT;
}
}
+ /**
+ * Get pending replacecommit transitions.
+ */
+ private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingReplaceCommitTransitions(HoodieTimeline oldTimeline,
Review Comment:
btw, we are not interested in any replace commit right. only the clustering
ones? can you point me to the code where we filter only for clustering replace
commits?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -134,14 +134,20 @@ protected void init(HoodieTableMetaClient metaClient,
HoodieTimeline visibleActi
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
- this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getWriteTimeline();
+ this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline
+ .getWriteTimeline()
+ .filterCompletedAndCompactionInstants();
}
/**
* Adds the provided statuses into the file system view, and also caches it
inside this object.
*/
public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
- HoodieTimer timer = HoodieTimer.start();
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ // Timeline determines whether a file slice is committed or not. Timeline
object is attached to each HoodieFileGroup.
+ // Using this timeline object HoodieFileGroup determines whether a
fileSlice is committed or not.
+ // visibleCommitsAndCompactionTimeline contains entire write timeline with
pending commits, so filtering it
Review Comment:
this comment does not make sense.
in L 137, we already filter for completed and compaction timeline. So, why
add this comment here. and infact we don't do any filtering in this code
snippet (L146 to 150). So its confusing.
or did you meant to add this comment in L137 ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline
newTimeline.getInstantsAsStream().filter(instant ->
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
+ // Check for log compaction commits completed or removed.
List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
- List<HoodieInstant> finishedOrRemovedLogCompactionInstants =
logCompactionInstants.stream()
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
.filter(instantPair -> !instantPair.getKey().isCompleted()
&& (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
- .map(Pair::getKey).collect(Collectors.toList());
- return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants, true);
+ .map(instantPair -> (instantPair.getValue() == null)
Review Comment:
can we simplifly the map as below
```
.map(instantPair -> Pair.of(instantPair.getKey(), instantPair.getValue() !=
null))
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -1246,6 +1301,19 @@ Stream<FileSlice> fetchAllFileSlices(String
partitionPath) {
.flatMap(HoodieFileGroup::getAllFileSlices);
}
+ /**
+ * Default implementation for fetching all file-slices for a partition-path.
+ *
+ * @param partitionPath Partition path
+ * @return file-slice stream
+ */
+ Stream<FileSlice> fetchAllFileSlices(String partitionPath, boolean
includePending) {
Review Comment:
same comment as above. can we try to re use code as much as possible.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java:
##########
@@ -137,7 +137,7 @@ public void close() {
*/
private static RocksDbBasedFileSystemView
createRocksDBBasedFileSystemView(SerializableConfiguration conf,
FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient) {
- HoodieTimeline timeline =
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
+ HoodieTimeline timeline = metaClient.getActiveTimeline();
Review Comment:
can you tell me how did you chase and fix all these?
found the caller stack trace for
AbstractHoodieFileSystemview.init(HoodieTableMetaClient metaClient,
HoodieTimeline visibleActiveTimeline) and fixed all of them ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -170,15 +176,25 @@ public List<HoodieFileGroup> addFilesToView(FileStatus[]
statuses) {
/**
* Build FileGroups from passed in file-status.
+ * TODO: Note here timeline is completedAndCompactionTimeline.
Review Comment:
can you rename the "timeline" to "completedWriteAndCompactionTimeline" so
that all the call stack knows what this timeline is about.
in fact, we can chase and fix all places.
for eg, we can rename in the constructor of HoodieFileGroup as well.
will avoid mistakes by someone in future who might make changes only in
HoodieFileGroup for instance.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -787,6 +803,21 @@ public final Stream<FileSlice> getLatestFileSlices(String
partitionStr) {
}
}
+ @Override
+ public final Stream<FileSlice> getLatestFileSlices(String partitionStr,
boolean includePending) {
Review Comment:
why can't we make one of this call the other.
getLatestFileSlices(String partitionStr)
I see opportunity for code re-use here as well
```
/**
* Default implementation for fetching latest file-slices for a partition
path.
*/
Stream<FileSlice> fetchLatestFileSlices(String partitionPath) {
return
fetchAllStoredFileGroups(partitionPath).map(HoodieFileGroup::getLatestFileSlice).filter(Option::isPresent)
.map(Option::get);
}
/**
* Default implementation for fetching latest file-slices for a partition
path.
*/
Stream<FileSlice> fetchLatestFileSlices(String partitionPath, boolean
includePending) {
return fetchAllStoredFileGroups(partitionPath).map(fileGroup ->
fileGroup.getLatestFileSlice(includePending)).filter(Option::isPresent)
.map(Option::get);
}
```
We can just call fileGroup.getLatestFileSlice(false) in other method.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant
instant) throws IOExce
}
/**
- * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit,
- * so you no longer need to track them as pending.
+ * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit or the log compaction
Review Comment:
minor.
`Remove pending log compaction instant`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java:
##########
@@ -544,10 +574,13 @@ protected Option<HoodieInstant> getReplaceInstant(final
HoodieFileGroupId fileGr
}
private Stream<HoodieFileGroup> getFileGroups(Stream<FileSlice> sliceStream)
{
+ // HoodieFileGroup using timeline to determine whether a file slice is
committed or not so passing
+ // completedAndCompactionTimeline object while creating HoodieFileGroup.
+ HoodieTimeline completedAndCompactionTimeline =
getVisibleCommitsAndCompactionTimeline();
Review Comment:
may be we shd name this
completedWriteAndCompactionTimeline
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java:
##########
@@ -325,6 +325,15 @@ private void registerFileSlicesAPI() {
writeValueAsString(ctx, dtos);
}, true));
+
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_INCLUDE_PENDING_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_PARTITION_SLICES", 1);
Review Comment:
shouldn't we fix this to LATEST_PARTITION_SLICES_INCLUDE_PENDING
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -327,11 +357,43 @@ private void addRollbackInstant(HoodieTimeline timeline,
HoodieInstant instant)
LOG.info("Done Syncing rollback instant (" + instant + ")");
}
+ /**
+ * Add pending replace file groups
+ * @param pendingClusteringInstant pending instant
+ * @throws IOException
+ */
+ private void addPendingClusteringInstant(HoodieInstant
pendingClusteringInstant) throws IOException {
+ LOG.info("Syncing pending clustering instant (" + pendingClusteringInstant
+ ")");
+ Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroupStreamToInstant =
+
ClusteringUtils.getFileGroupEntriesFromClusteringInstant(pendingClusteringInstant,
metaClient);
+ addFileGroupsInPendingClustering(fileGroupStreamToInstant);
+ }
+
+ /**
+ * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit or the log compaction
Review Comment:
minor. fix the docs. it talks about compaction while the actual method is
for clustering
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java:
##########
@@ -145,6 +150,17 @@ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String
partitionPath, String max
*/
Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionPath, String maxInstantTime);
+ /**
+ * Stream all "merged" file-slices before on an instant time If a
file-group has a pending compaction request, the
Review Comment:
minor.
"before or on an ..."
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline
newTimeline.getInstantsAsStream().filter(instant ->
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
+ // Check for log compaction commits completed or removed.
List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
- List<HoodieInstant> finishedOrRemovedLogCompactionInstants =
logCompactionInstants.stream()
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
.filter(instantPair -> !instantPair.getKey().isCompleted()
&& (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
- .map(Pair::getKey).collect(Collectors.toList());
- return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants, true);
+ .map(instantPair -> (instantPair.getValue() == null)
+ ? Pair.of(instantPair.getKey(), false) :
Pair.of(instantPair.getKey(), true))
+ .collect(Collectors.toList());
+
+ // Check for replace commits completed or removed.
+ List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants =
getPendingReplaceCommitTransitions(oldTimeline, newTimeline);
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+ .filter(instantPair -> !instantPair.getKey().isCompleted()
+ && (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
+ .map(instantPair -> (instantPair.getValue() == null)
+ ? Pair.of(instantPair.getKey(), false) :
Pair.of(instantPair.getKey(), true))
+ .collect(Collectors.toList());
+
+ // New instants will contains instants that are newly completed commits
or newly created pending rewrite commits
+ // (i.e. compaction, logcompaciton, replacecommit)
+ // Finished or removed rewrite commits are handled independently.
+ return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants,
+ finishedOrRemovedReplaceCommitInstants, true);
} else {
// One or more timelines is empty
LOG.warn("One or more timelines is empty");
return TimelineDiffResult.UNSAFE_SYNC_RESULT;
}
}
+ /**
+ * Get pending replacecommit transitions.
+ */
+ private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingReplaceCommitTransitions(HoodieTimeline oldTimeline,
+
HoodieTimeline newTimeline) {
+ Set<HoodieInstant> newTimelineInstants =
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
+
+ return
oldTimeline.filterPendingReplaceTimeline().getInstantsAsStream().map(instant ->
{
+ if (newTimelineInstants.contains(instant)) {
Review Comment:
guess this is what danny was proposing
```
private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingLogCompactionTransitions(HoodieTimeline oldTimeline,
Function<HoodieTimeline, HoodieTimeline>
getPendingActionTimelineFromOld,
HoodieTimeline newTimeline,
String completedAction, String pendingAction) {
Set<HoodieInstant> newTimelineInstants =
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
return
getPendingActionTimelineFromOld.apply(oldTimeline).getInstantsAsStream().map(instant
-> {
if (newTimelineInstants.contains(instant)) {
return Pair.of(instant, instant);
} else {
HoodieInstant completedInstant =
new HoodieInstant(State.COMPLETED, completedAction,
instant.getTimestamp());
if (newTimelineInstants.contains(completedInstant)) {
return Pair.of(instant, completedInstant);
}
HoodieInstant inflightInstant =
new HoodieInstant(State.INFLIGHT, pendingAction,
instant.getTimestamp());
if (newTimelineInstants.contains(inflightInstant)) {
return Pair.of(instant, inflightInstant);
}
return Pair.<HoodieInstant, HoodieInstant>of(instant, null);
}
}).collect(Collectors.toList());
}
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline
newTimeline.getInstantsAsStream().filter(instant ->
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
+ // Check for log compaction commits completed or removed.
List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
- List<HoodieInstant> finishedOrRemovedLogCompactionInstants =
logCompactionInstants.stream()
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
.filter(instantPair -> !instantPair.getKey().isCompleted()
&& (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
- .map(Pair::getKey).collect(Collectors.toList());
- return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants, true);
+ .map(instantPair -> (instantPair.getValue() == null)
+ ? Pair.of(instantPair.getKey(), false) :
Pair.of(instantPair.getKey(), true))
+ .collect(Collectors.toList());
+
+ // Check for replace commits completed or removed.
+ List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants =
getPendingReplaceCommitTransitions(oldTimeline, newTimeline);
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+ .filter(instantPair -> !instantPair.getKey().isCompleted()
+ && (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
+ .map(instantPair -> (instantPair.getValue() == null)
+ ? Pair.of(instantPair.getKey(), false) :
Pair.of(instantPair.getKey(), true))
+ .collect(Collectors.toList());
+
+ // New instants will contains instants that are newly completed commits
or newly created pending rewrite commits
+ // (i.e. compaction, logcompaciton, replacecommit)
+ // Finished or removed rewrite commits are handled independently.
+ return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants,
+ finishedOrRemovedReplaceCommitInstants, true);
} else {
// One or more timelines is empty
LOG.warn("One or more timelines is empty");
return TimelineDiffResult.UNSAFE_SYNC_RESULT;
}
}
+ /**
+ * Get pending replacecommit transitions.
+ */
+ private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingReplaceCommitTransitions(HoodieTimeline oldTimeline,
+
HoodieTimeline newTimeline) {
+ Set<HoodieInstant> newTimelineInstants =
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
+
+ return
oldTimeline.filterPendingReplaceTimeline().getInstantsAsStream().map(instant ->
{
+ if (newTimelineInstants.contains(instant)) {
Review Comment:
surya, next time when you push an update, you can add this.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -953,6 +984,18 @@ public final Stream<FileSlice> getAllFileSlices(String
partitionStr) {
}
}
+ @Override
+ public final Stream<FileSlice> getAllFileSlices(String partitionStr, boolean
includePending) {
Review Comment:
same comment as above
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -159,7 +177,12 @@ private void runIncrementalSync(HoodieTimeline timeline,
TimelineDiffResult diff
} else if
(instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
addRollbackInstant(timeline, instant);
} else if
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- addReplaceInstant(timeline, instant);
+ boolean isClusteringCommit =
ClusteringUtils.isClusteringCommit(metaClient, instant);
+ if (!instant.isCompleted() && isClusteringCommit) {
+ addPendingClusteringInstant(instant);
+ } else {
+ addReplaceInstant(timeline, instant);
Review Comment:
in L163, we already filter only for clustering replace commits. So, wouldn't
this mean, addCompletedClusteringInstant.
or am I mis-understanding something ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant
instant) throws IOExce
}
/**
- * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit,
- * so you no longer need to track them as pending.
+ * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit or the log compaction
+ * operation is failed, so it is no longer need to be tracked as pending.
*
* @param instant Log Compaction Instant to be removed
+ * @param isSuccessful true if log compaction operation is successful, false
if the operation is failed and rollbacked.
*/
- private void removePendingLogCompactionInstant(HoodieInstant instant) throws
IOException {
- LOG.info("Removing completed log compaction instant (" + instant + ")");
- HoodieCompactionPlan plan =
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
-
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
plan)
- .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
-
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+ private void removePendingLogCompactionInstant(HoodieInstant instant,
boolean isSuccessful) throws IOException {
+ if (isSuccessful) {
+ LOG.info("Removing completed log compaction instant (" + instant + ")");
+ HoodieCompactionPlan plan =
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
+
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
plan)
+ .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
+
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+ } else {
+ LOG.info("Removing failed log compaction instant (" + instant + ")");
+ removePendingLogCompactionOperations(instant.getTimestamp());
Review Comment:
why I don't see this if else with removePendingCompactionInstant. can you
help me understand please
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]