suryaprasanna commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264721375
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant
instant) throws IOExce
}
/**
- * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit,
- * so you no longer need to track them as pending.
+ * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit or the log compaction
+ * operation is failed, so it is no longer need to be tracked as pending.
*
* @param instant Log Compaction Instant to be removed
+ * @param isSuccessful true if log compaction operation is successful, false
if the operation is failed and rollbacked.
*/
- private void removePendingLogCompactionInstant(HoodieInstant instant) throws
IOException {
- LOG.info("Removing completed log compaction instant (" + instant + ")");
- HoodieCompactionPlan plan =
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
-
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
plan)
- .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
-
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+ private void removePendingLogCompactionInstant(HoodieInstant instant,
boolean isSuccessful) throws IOException {
+ if (isSuccessful) {
+ LOG.info("Removing completed log compaction instant (" + instant + ")");
+ HoodieCompactionPlan plan =
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
+
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
plan)
+ .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
+
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+ } else {
+ LOG.info("Removing failed log compaction instant (" + instant + ")");
+ removePendingLogCompactionOperations(instant.getTimestamp());
Review Comment:
The concern, is similar to the one raised in TimelineDiffHelper class, I
explained the reasoning behind it there.
Also, one more point to note is Compaction plans are by their nature
immutable, whereas replacecommits plans can be either mutable or immutable. It
depends on what UpdateStrategy implementation users are using. That means
traditional flow cannot rollback compaction plans except a developer manually
doing a restore, and replacecommit can be rolled back.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -134,15 +137,16 @@ protected void init(HoodieTableMetaClient metaClient,
HoodieTimeline visibleActi
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
- this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getWriteTimeline();
+ this.visibleFileSystemViewTimeline = visibleActiveTimeline
+ .getFileSystemViewTimeline();
}
/**
* Adds the provided statuses into the file system view, and also caches it
inside this object.
*/
public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
- HoodieTimer timer = HoodieTimer.start();
- List<HoodieFileGroup> fileGroups = buildFileGroups(statuses,
visibleCommitsAndCompactionTimeline, true);
+ HoodieTimer timer = new HoodieTimer().startTimer();
Review Comment:
Good catch, rebasing might have introduced it fixed it now.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -130,36 +141,50 @@ private void runIncrementalSync(HoodieTimeline timeline,
TimelineDiffResult diff
});
// Now remove pending log compaction instants which were completed or
removed
-
diffResult.getFinishedOrRemovedLogCompactionInstants().stream().forEach(instant
-> {
+
diffResult.getFinishedOrRemovedLogCompactionInstants().stream().forEach(instantPair
-> {
+ try {
+ removePendingLogCompactionInstant(instantPair.getKey(),
instantPair.getValue());
+ } catch (IOException e) {
+ throw new HoodieException(e);
+ }
+ });
+
+ // Now remove pending replace instants which were completed or removed
+
diffResult.getFinishedOrRemovedReplaceInstants().stream().forEach(instantPair
-> {
try {
- removePendingLogCompactionInstant(instant);
+ removePendingFileGroupsInPendingClustering(instantPair.getKey(),
instantPair.getValue());
} catch (IOException e) {
throw new HoodieException(e);
}
});
- // Add new completed instants found in the latest timeline, this also
contains inflight instants.
- diffResult.getNewlySeenInstants().stream()
- .filter(instant -> instant.isCompleted()
- || instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)
- ||
instant.getAction().equals(HoodieTimeline.LOG_COMPACTION_ACTION))
+ HoodieTimeline newFileSystemViewTimeline =
+ newActiveTimeline.getFileSystemViewTimeline();
+ // Add new completed instants found in the latest timeline,
+ // this also contains inflight instants from rewrite timeline.
+ diffResult.getNewlySeenCompletedAndRewriteInstants()
.forEach(instant -> {
try {
if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)
||
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
- addCommitInstant(timeline, instant);
+ addCommitInstant(newFileSystemViewTimeline, instant);
Review Comment:
It is, if I pass active timeline multiple times we will have to filter and
recreate fileSystemViewTimeline. So, I created it once and passed it for
methods that construct file groups. Also, all these method are private and are
present in the same class so they could use a common view when building file
groups.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -90,9 +91,11 @@ public abstract class AbstractTableFileSystemView implements
SyncableFileSystemV
protected HoodieTableMetaClient metaClient;
- // This is the commits timeline that will be visible for all views extending
this view
- // This is nothing but the write timeline, which contains both ingestion and
compaction(major and minor) writers.
- private HoodieTimeline visibleCommitsAndCompactionTimeline;
+ // Timeline determines whether a file slice is committed or not. This
timeline object is attached
Review Comment:
Yeah, fixed it.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -150,11 +151,25 @@ public static Map<HoodieFileGroupId, HoodieInstant>
getAllFileGroupsInPendingClu
public static Stream<Pair<HoodieFileGroupId, HoodieInstant>>
getFileGroupsInPendingClusteringInstant(
HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
+ if (clusteringPlan == null || clusteringPlan.getInputGroups() == null ||
clusteringPlan.getInputGroups().isEmpty()) {
+ return Stream.empty();
+ }
Stream<HoodieFileGroupId> partitionToFileIdLists =
clusteringPlan.getInputGroups().stream().flatMap(ClusteringUtils::getFileGroupsFromClusteringGroup);
return partitionToFileIdLists.map(e -> Pair.of(e, instant));
}
- private static Stream<Map.Entry<HoodieFileGroupId, HoodieInstant>>
getFileGroupEntriesInClusteringPlan(
+ public static Stream<Pair<HoodieFileGroupId, HoodieInstant>>
getFileGroupEntriesFromClusteringInstant(
+ HoodieInstant clusteringInstant, HoodieTableMetaClient metaClient)
throws IOException {
+ Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
+ ClusteringUtils.getRequestedReplaceMetadata(metaClient,
clusteringInstant);
Review Comment:
isCluteringInstant checks are already before calling this API.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -478,8 +488,11 @@ protected FileStatus[] listPartition(Path partitionPath)
throws IOException {
* @param statuses List of File-Status
*/
private Stream<HoodieBaseFile> convertFileStatusesToBaseFiles(FileStatus[]
statuses) {
- Predicate<FileStatus> roFilePredicate = fileStatus ->
fileStatus.getPath().getName()
-
.contains(metaClient.getTableConfig().getBaseFileFormat().getFileExtension());
+ // Format of .hoodie_partition_metadata is changed to
.hoodie_partition_metadata.parquet
Review Comment:
Ref:
https://github.com/apache/hudi/pull/5179/files#diff-c2a2767975107231dd51373bedd57cff81256256e0cb9ea2393b5af78f2c66dbR112
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java:
##########
@@ -299,6 +310,17 @@ void
removeFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieIn
});
}
+ @Override
+ protected void removeFileGroupsInPendingClustering(String clusteringInstant)
{
+ List<HoodieFileGroupId> fileGroupIdsToRemove =
fgIdToPendingClustering.entrySet()
+ .stream().filter(fgIdInstantEntry ->
clusteringInstant.equals(fgIdInstantEntry.getValue().getTimestamp()))
+ .map(Map.Entry::getKey).collect(Collectors.toList());
+ ValidationUtils.checkArgument(!fileGroupIdsToRemove.isEmpty(),
Review Comment:
If we have arrived at this point, that means old timeline has replacecommit
in-inflight and new timeline either completed it or rolled back.
In that case, if we are not finding the filegroups to remove that means
there maybe some other issues. So, it is could to fail incremental file sync
and let full sync happen. If required at later point we could add monitoring
for incremental sync to make sure everything is working as expected.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -144,44 +172,54 @@ private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingCompactionTran
*/
public static class TimelineDiffResult {
- private final List<HoodieInstant> newlySeenInstants;
+ private final List<HoodieInstant> newlySeenCompletedAndRewriteInstants;
private final List<HoodieInstant> finishedCompactionInstants;
Review Comment:
Ideally compaction are not removed, since after creating a compaction plan
even though they are in inflight state new data is written to new file slices.
So, if we were to remove compaction, then we might need to remove latest files.
We could add APIs to remove them, but it is bit risky.
Also compaction plans are immutable plans so writers cannot delete them,
only developers can remove them manually by using restore operations.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -833,11 +832,11 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>>
twoUpsertCommitDataWithTwoP
SyncableFileSystemView fsView =
getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
List<HoodieFileGroup> firstPartitionCommit2FileGroups =
fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, firstPartitionCommit2FileGroups.size());
-
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlicesIncludingInflight().collect(Collectors.toList()));
Review Comment:
Existing unit tests are written to read inflight file slices from the views,
it is not evident from the APIs we have in File system view classes.
For example look at line 832 in this class., by passing active timeline they
are expecting to read inflight file slices and by passing
completedAndCompactionTimeline in FileSystemViewManager class they can read
committed file slices. With this change everyone have to pass active timeline
and use the appropriate APIs to read the respective slice streams.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java:
##########
@@ -178,7 +203,15 @@ public Option<HoodieBaseFile> getLatestDataFile() {
* Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime.
*/
public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime)
{
- return Option.fromJavaOptional(getAllFileSlices().filter(slice ->
compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS,
maxInstantTime)).findFirst());
+ return getLatestFileSliceBeforeOrOn(maxInstantTime, false);
+ }
+
+ /**
+ * Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime.
+ */
+ public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime,
boolean includePending) {
Review Comment:
I have created these methods to fix broken test cases, existing unit tests
are already use the logic present in these methods in a different way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]