codope commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264702031
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -833,11 +832,11 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>>
twoUpsertCommitDataWithTwoP
SyncableFileSystemView fsView =
getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
List<HoodieFileGroup> firstPartitionCommit2FileGroups =
fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, firstPartitionCommit2FileGroups.size());
-
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlicesIncludingInflight().collect(Collectors.toList()));
Review Comment:
why are we including inflight slices as well?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java:
##########
@@ -145,6 +150,17 @@ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String
partitionPath, String max
*/
Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionPath, String maxInstantTime);
+ /**
+ * Stream all "merged" file-slices before or on an instant time If a
file-group has a pending compaction request, the
+ * file-slice before and after compaction request instant is merged and
returned. This API can also include pending instants.
+ *
+ * @param partitionPath Partition Path
+ * @param maxInstantTime Max Instant Time
+ * @param includePending Ability to include pending instants as well
+ * @return
+ */
+ Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionPath, String maxInstantTime, boolean includePending);
Review Comment:
please add tests for the new methods
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant
instant) throws IOExce
}
/**
- * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit,
- * so you no longer need to track them as pending.
+ * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit or the log compaction
+ * operation is failed, so it is no longer need to be tracked as pending.
*
* @param instant Log Compaction Instant to be removed
+ * @param isSuccessful true if log compaction operation is successful, false
if the operation is failed and rollbacked.
*/
- private void removePendingLogCompactionInstant(HoodieInstant instant) throws
IOException {
- LOG.info("Removing completed log compaction instant (" + instant + ")");
- HoodieCompactionPlan plan =
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
-
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
plan)
- .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
-
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+ private void removePendingLogCompactionInstant(HoodieInstant instant,
boolean isSuccessful) throws IOException {
+ if (isSuccessful) {
+ LOG.info("Removing completed log compaction instant (" + instant + ")");
+ HoodieCompactionPlan plan =
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
+
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
plan)
+ .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
+
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+ } else {
+ LOG.info("Removing failed log compaction instant (" + instant + ")");
+ removePendingLogCompactionOperations(instant.getTimestamp());
Review Comment:
yes i think we should apply the same to clustering commit too.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -134,15 +137,16 @@ protected void init(HoodieTableMetaClient metaClient,
HoodieTimeline visibleActi
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
- this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getWriteTimeline();
+ this.visibleFileSystemViewTimeline = visibleActiveTimeline
+ .getFileSystemViewTimeline();
}
/**
* Adds the provided statuses into the file system view, and also caches it
inside this object.
*/
public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
- HoodieTimer timer = HoodieTimer.start();
- List<HoodieFileGroup> fileGroups = buildFileGroups(statuses,
visibleCommitsAndCompactionTimeline, true);
+ HoodieTimer timer = new HoodieTimer().startTimer();
Review Comment:
is there a need to change this? it re-introduces deprecated code.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -37,27 +38,31 @@ public class TimelineDiffHelper {
private static final Logger LOG =
LoggerFactory.getLogger(TimelineDiffHelper.class);
- public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline oldTimeline,
- HoodieTimeline newTimeline) {
-
- HoodieTimeline oldT = oldTimeline.filterCompletedAndCompactionInstants();
- HoodieTimeline newT = newTimeline.filterCompletedAndCompactionInstants();
+ /**
+ * It performs validation to see if the incremental sync is possible. Once
it determines it is safe to do,
+ * It tracks both the instants that are newly added and also instants that
have transitioned from pending
+ * state to completed or removed.
+ * @param oldCompletedAndRewriteTimeline old completed and rewrite timeline
+ * @param newCompletedAndRewriteTimeline new completed and rewrite timeline
+ * @return TimelineDiffResult object which is useful for incrementally
syncing the timeline.
+ */
+ public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline oldCompletedAndRewriteTimeline,
Review Comment:
Let's make sure that we add tests for the diff helper as it is used for MDT
based fs view too.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java:
##########
@@ -299,6 +310,17 @@ void
removeFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieIn
});
}
+ @Override
+ protected void removeFileGroupsInPendingClustering(String clusteringInstant)
{
+ List<HoodieFileGroupId> fileGroupIdsToRemove =
fgIdToPendingClustering.entrySet()
+ .stream().filter(fgIdInstantEntry ->
clusteringInstant.equals(fgIdInstantEntry.getValue().getTimestamp()))
+ .map(Map.Entry::getKey).collect(Collectors.toList());
+ ValidationUtils.checkArgument(!fileGroupIdsToRemove.isEmpty(),
Review Comment:
Is this validation strictly necessary? If it's not found then also
incremental sync can continue, isn't it?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -163,14 +163,19 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline filterCompletedAndCompactionInstants();
- HoodieTimeline filterCompletedOrMajorOrMinorCompactionInstants();
+ /**
+ * File system view that is used for constructing file groups.
+ * This is the filtered timeline contains completed instants that are part
of either
+ * write timeline or savepoint timeline. Along with this it also includes
any pending compaction commits.
+ */
+ HoodieTimeline getFileSystemViewTimeline();
Review Comment:
Why should it not include rollback and restore timeline? Let's make it clear
in the documentation.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java:
##########
@@ -178,7 +203,15 @@ public Option<HoodieBaseFile> getLatestDataFile() {
* Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime.
*/
public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime)
{
- return Option.fromJavaOptional(getAllFileSlices().filter(slice ->
compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS,
maxInstantTime)).findFirst());
+ return getLatestFileSliceBeforeOrOn(maxInstantTime, false);
+ }
+
+ /**
+ * Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime.
+ */
+ public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime,
boolean includePending) {
Review Comment:
let's make sure we have tests covering all these methods.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -150,11 +151,25 @@ public static Map<HoodieFileGroupId, HoodieInstant>
getAllFileGroupsInPendingClu
public static Stream<Pair<HoodieFileGroupId, HoodieInstant>>
getFileGroupsInPendingClusteringInstant(
HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
+ if (clusteringPlan == null || clusteringPlan.getInputGroups() == null ||
clusteringPlan.getInputGroups().isEmpty()) {
+ return Stream.empty();
+ }
Stream<HoodieFileGroupId> partitionToFileIdLists =
clusteringPlan.getInputGroups().stream().flatMap(ClusteringUtils::getFileGroupsFromClusteringGroup);
return partitionToFileIdLists.map(e -> Pair.of(e, instant));
}
- private static Stream<Map.Entry<HoodieFileGroupId, HoodieInstant>>
getFileGroupEntriesInClusteringPlan(
+ public static Stream<Pair<HoodieFileGroupId, HoodieInstant>>
getFileGroupEntriesFromClusteringInstant(
+ HoodieInstant clusteringInstant, HoodieTableMetaClient metaClient)
throws IOException {
+ Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
+ ClusteringUtils.getRequestedReplaceMetadata(metaClient,
clusteringInstant);
Review Comment:
note that for `insert_overwrite` operation, requested instant does not
contain the clustering plan, so at line 165 below it might throw NPE.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -478,8 +488,11 @@ protected FileStatus[] listPartition(Path partitionPath)
throws IOException {
* @param statuses List of File-Status
*/
private Stream<HoodieBaseFile> convertFileStatusesToBaseFiles(FileStatus[]
statuses) {
- Predicate<FileStatus> roFilePredicate = fileStatus ->
fileStatus.getPath().getName()
-
.contains(metaClient.getTableConfig().getBaseFileFormat().getFileExtension());
+ // Format of .hoodie_partition_metadata is changed to
.hoodie_partition_metadata.parquet
Review Comment:
When did this change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]