This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 31dbcfe0896d [HUDI-9667] Incorporate completion time into restore
logic (#13653)
31dbcfe0896d is described below
commit 31dbcfe0896d91d5e753873bb6bbb3f4169fc72f
Author: Tim Brown <[email protected]>
AuthorDate: Mon Aug 4 05:31:17 2025 -0400
[HUDI-9667] Incorporate completion time into restore logic (#13653)
* fix restore sequence to be in completion reverse order, still requested
time comparison for compaction
* add a custom comparator for the restore instant sort
---------
Co-authored-by: danny0405 <[email protected]>
---
.../rollback/ListingBasedRollbackStrategy.java | 109 ++---
.../rollback/RestoreInstantComparatorFactory.java | 61 +++
.../action/rollback/RestorePlanActionExecutor.java | 35 +-
.../hudi/table/action/rollback/RollbackHelper.java | 5 +-
.../table/action/savepoint/SavepointHelpers.java | 16 +-
.../hudi/testutils/HoodieClientTestBase.java | 9 +-
.../common/table/timeline/BaseHoodieTimeline.java | 10 +-
.../table/timeline/CompletionTimeQueryView.java | 4 +-
.../hudi/common/table/timeline/HoodieTimeline.java | 8 +-
.../common/table/timeline/InstantComparator.java | 2 +-
.../versioning/common/InstantComparators.java | 9 +
.../table/timeline/TestInstantComparators.java | 56 +++
.../hudi/common/testutils/HoodieTestUtils.java | 2 +-
.../hudi/functional/TestHoodieFileSystemViews.java | 12 +-
.../TestSavepointRestoreMergeOnRead.java | 481 +++++++++++++++++++--
.../hudi/functional/TestRecordLevelIndex.scala | 40 +-
16 files changed, 695 insertions(+), 164 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index 28905f75f75d..9a5f2e3fc64d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
@@ -122,11 +121,10 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
return context.flatMap(partitionPaths, partitionPath -> {
List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
- Supplier<List<StoragePathInfo>> filesToDelete = () -> {
+ Supplier<List<StoragePath>> filesToDelete = () -> {
try {
return fetchFilesFromInstant(instantToRollback, partitionPath,
metaClient.getBasePath().toString(), baseFileExtension,
- metaClient.getStorage(),
- commitMetadataOptional, isCommitMetadataCompleted, tableType);
+ metaClient.getStorage(), commitMetadataOptional,
isCommitMetadataCompleted, tableType,
metaClient.getTableConfig().getTableVersion());
} catch (IOException e) {
throw new HoodieIOException("Fetching files to delete error", e);
}
@@ -164,11 +162,10 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
} else {
// if this is part of a restore operation, we should
rollback/delete entire file slice.
// For table version 6, the files can be directly fetched from
the instant to rollback
- // For table version 8, the files are computed based on
completion time. All files completed after
- // the requested time of instant to rollback are included
-
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath,
isTableVersionLessThanEight ? filesToDelete.get() :
- listAllFilesSinceCommit(instantToRollback.requestedTime(),
baseFileExtension, partitionPath,
- metaClient)));
+ // For table version 8, the log files are not directly
associated with the base file.
+ // The rollback will iterate in reverse order based on
completion time so the log files completed
+ // after the compaction will already be queued for removal and
therefore, only the files from the compaction commit must be deleted.
+
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath,
filesToDelete.get()));
}
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
@@ -280,32 +277,11 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
return hoodieRollbackRequests;
}
- private List<StoragePathInfo> listAllFilesSinceCommit(String commit,
- String
baseFileExtension,
- String partitionPath,
- HoodieTableMetaClient
metaClient) throws IOException {
- LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
- CompletionTimeQueryView completionTimeQueryView =
metaClient.getTableFormat().getTimelineFactory().createCompletionTimeQueryView(metaClient);
- StoragePathFilter filter = (path) -> {
- if (path.toString().contains(baseFileExtension)) {
- String fileCommitTime = FSUtils.getCommitTime(path.getName());
- return compareTimestamps(commit, LESSER_THAN_OR_EQUALS,
- fileCommitTime);
- } else if (FSUtils.isLogFile(path)) {
- String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(path);
- return completionTimeQueryView.isSlicedAfterOrOn(commit,
fileCommitTime);
- }
- return false;
- };
- return metaClient.getStorage()
- .listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(),
partitionPath), filter);
- }
-
@NotNull
- private List<HoodieRollbackRequest> getHoodieRollbackRequests(String
partitionPath, List<StoragePathInfo> filesToDeletedStatus) {
+ private List<HoodieRollbackRequest> getHoodieRollbackRequests(String
partitionPath, List<StoragePath> filesToDeletedStatus) {
return filesToDeletedStatus.stream()
.map(pathInfo -> {
- String dataFileToBeDeleted = pathInfo.getPath().toString();
+ String dataFileToBeDeleted = pathInfo.toString();
return formatDeletePath(dataFileToBeDeleted);
})
.map(s -> new HoodieRollbackRequest(partitionPath, EMPTY_STRING,
EMPTY_STRING, Collections.singletonList(s), Collections.emptyMap()))
@@ -317,11 +293,11 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
return path.substring(path.indexOf(":") + 1);
}
- private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
- String
basefileExtension,
- String partitionPath,
- HoodieStorage
storage) throws IOException {
- LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
+ private List<StoragePath> listBaseFilesToBeDeleted(String commit,
+ String basefileExtension,
+ String partitionPath,
+ HoodieStorage storage)
throws IOException {
+ LOG.info("Collecting files to be cleaned/rolledback up for path {} and
commit {}", partitionPath, commit);
StoragePathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
@@ -329,44 +305,33 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
}
return false;
};
- return
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(),
partitionPath), filter);
+ return
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(),
partitionPath),
filter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
}
- private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant
instantToRollback,
- String partitionPath,
String basePath,
- String
baseFileExtension, HoodieStorage storage,
-
Option<HoodieCommitMetadata> commitMetadataOptional,
- Boolean
isCommitMetadataCompleted,
- HoodieTableType
tableType) throws IOException {
- // go w/ commit metadata only for COW table. for MOR, we need to get
associated log files when commit corresponding to base file is rolledback.
- if (isCommitMetadataCompleted && tableType ==
HoodieTableType.COPY_ON_WRITE) {
- return fetchFilesFromCommitMetadata(instantToRollback, partitionPath,
basePath, commitMetadataOptional.get(),
- baseFileExtension, storage);
+ private List<StoragePath> fetchFilesFromInstant(HoodieInstant
instantToRollback,
+ String partitionPath, String
basePath,
+ String baseFileExtension,
HoodieStorage storage,
+ Option<HoodieCommitMetadata>
commitMetadataOptional,
+ boolean
isCommitMetadataCompleted,
+ HoodieTableType tableType,
+ HoodieTableVersion
tableVersion) throws IOException {
+ // for MOR tables with version < 8, listing is required to fetch the log
files associated with base files added by this commit.
+ if (isCommitMetadataCompleted && (tableType ==
HoodieTableType.COPY_ON_WRITE ||
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))) {
+ return fetchFilesFromCommitMetadata(instantToRollback, partitionPath,
basePath, commitMetadataOptional.get(), baseFileExtension);
} else {
return fetchFilesFromListFiles(instantToRollback, partitionPath,
basePath, baseFileExtension, storage);
}
}
- private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant
instantToRollback,
- String
partitionPath,
- String basePath,
-
HoodieCommitMetadata commitMetadata,
- String
baseFileExtension,
- HoodieStorage
storage) throws IOException {
+ private List<StoragePath> fetchFilesFromCommitMetadata(HoodieInstant
instantToRollback,
+ String partitionPath,
+ String basePath,
+ HoodieCommitMetadata
commitMetadata,
+ String
baseFileExtension) {
StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
instantToRollback.requestedTime());
- List<StoragePath> filePaths = getFilesFromCommitMetadata(basePath,
commitMetadata, partitionPath)
- .filter(entry -> {
- try {
- return storage.exists(entry);
- } catch (Exception e) {
- LOG.error("Exists check failed for " + entry.toString(), e);
- }
- // if any Exception is thrown, do not ignore. let's try to add the
file of interest to be deleted. we can't miss any files to be rolled back.
- return true;
- }).collect(Collectors.toList());
-
- return storage.listDirectEntries(filePaths, pathFilter);
+ return getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath)
+ .filter(pathFilter::accept).collect(Collectors.toList());
}
/**
@@ -379,15 +344,15 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
* @return
* @throws IOException
*/
- private List<StoragePathInfo> fetchFilesFromListFiles(HoodieInstant
instantToRollback,
- String partitionPath,
- String basePath,
- String
baseFileExtension,
- HoodieStorage storage)
throws IOException {
+ private List<StoragePath> fetchFilesFromListFiles(HoodieInstant
instantToRollback,
+ String partitionPath,
+ String basePath,
+ String baseFileExtension,
+ HoodieStorage storage)
throws IOException {
StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
instantToRollback.requestedTime());
List<StoragePath> filePaths = listFilesToBeDeleted(basePath,
partitionPath);
- return storage.listDirectEntries(filePaths, pathFilter);
+ return storage.listDirectEntries(filePaths,
pathFilter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
}
private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestoreInstantComparatorFactory.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestoreInstantComparatorFactory.java
new file mode 100644
index 000000000000..224115bb1de0
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestoreInstantComparatorFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.InstantComparator;
+
+import java.util.Comparator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Comparator specifically for computing the instant order when computing the
instants to rollback as part of a restore operation.
+ * The order relies on the completion time of the instants, except for
compaction instants on Merge-on-Read tables. These instants may be completed
after a
+ * delta-commit but should still be considered earlier since the log files
from the next delta-commit become associated with the base files from this
compaction.
+ * For example if we have the following sequence of commits (DC=delta-commit,
C=compaction):
+ * ... DC-10 starts -> DC-10 ends -> compaction-1 starts -> delta-commit-11
starts -> delta-commit-11 ends -> compaction-1 ends ...
+ * If we restore to delta-commit-11, we do not roll back the compaction-1
instant, even though it finished after delta-commit-11.
+ */
+public class RestoreInstantComparatorFactory {
+ private static final Set<String> COMPACTION_ACTIONS =
Stream.of(HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.COMMIT_ACTION).collect(Collectors.toSet());
+
+ public static Comparator<HoodieInstant>
createComparator(HoodieTableMetaClient metaClient) {
+ InstantComparator instantComparator =
metaClient.getTimelineLayout().getInstantComparator();
+ HoodieTableType tableType = metaClient.getTableType();
+ if (tableType == HoodieTableType.COPY_ON_WRITE) {
+ return (o1, o2) ->
instantComparator.completionTimeOrderedComparator().compare(o1, o2);
+ } else {
+ return (o1, o2) -> {
+ // Do to special handling of compaction instants, we need to use
requested time based comparator for compaction instants
+ // but completion time based comparator for others
+ if (COMPACTION_ACTIONS.contains(o1.getAction()) ||
COMPACTION_ACTIONS.contains(o2.getAction())) {
+ return
instantComparator.requestedTimeOrderedComparator().compare(o1, o2);
+ } else {
+ return
instantComparator.completionTimeOrderedComparator().compare(o1, o2);
+ }
+ };
+ }
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
index 42af5c2f941f..169b8849edbb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
@@ -22,30 +22,34 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Comparator;
import java.util.List;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
+import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
/**
* Plans the restore action and add a restore.requested meta file to timeline.
*/
public class RestorePlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K, O, Option<HoodieRestorePlan>> {
-
private static final Logger LOG =
LoggerFactory.getLogger(RestorePlanActionExecutor.class);
public static final Integer RESTORE_PLAN_VERSION_1 = 1;
@@ -65,7 +69,8 @@ public class RestorePlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T,
@Override
public Option<HoodieRestorePlan> execute() {
final HoodieInstant restoreInstant =
instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.RESTORE_ACTION, instantTime);
- try {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ try (CompletionTimeQueryView completionTimeQueryView =
metaClient.getTableFormat().getTimelineFactory().createCompletionTimeQueryView(metaClient))
{
// Get all the commits on the timeline after the provided commit time
// rollback pending clustering instants first before other instants (See
HUDI-3362)
List<HoodieInstant> pendingClusteringInstantsToRollback =
table.getActiveTimeline().filterPendingReplaceOrClusteringTimeline()
@@ -75,10 +80,14 @@ public class RestorePlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T,
.filter(instant -> GREATER_THAN.test(instant.requestedTime(),
savepointToRestoreTimestamp))
.collect(Collectors.toList());
- // Get all the commits on the timeline after the provided commit time
+ // Get all the commits on the timeline after the provided commit's
completion time unless it is the SOLO_COMMIT_TIMESTAMP which indicates there
are no commits for the table
+ String completionTime =
savepointToRestoreTimestamp.equals(SOLO_COMMIT_TIMESTAMP) ?
savepointToRestoreTimestamp :
completionTimeQueryView.getCompletionTime(savepointToRestoreTimestamp)
+ .orElseThrow(() -> new HoodieException("Unable to find completion
time for instant: " + savepointToRestoreTimestamp));
+
+ Predicate<HoodieInstant> instantFilter =
constructInstantFilter(completionTime);
List<HoodieInstant> commitInstantsToRollback =
table.getActiveTimeline().getWriteTimeline()
- .getReverseOrderedInstants()
- .filter(instant -> GREATER_THAN.test(instant.requestedTime(),
savepointToRestoreTimestamp))
+ .getReverseOrderedInstantsByCompletionTime()
+ .filter(instantFilter)
.filter(instant ->
!pendingClusteringInstantsToRollback.contains(instant))
.collect(Collectors.toList());
@@ -90,11 +99,17 @@ public class RestorePlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T,
HoodieRestorePlan restorePlan = new
HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION,
savepointToRestoreTimestamp);
table.getActiveTimeline().saveToRestoreRequested(restoreInstant,
restorePlan);
table.getMetaClient().reloadActiveTimeline();
- LOG.info("Requesting Restore with instant time " + restoreInstant);
+ LOG.info("Requesting Restore with instant time {}", restoreInstant);
return Option.of(restorePlan);
- } catch (HoodieIOException e) {
- LOG.error("Got exception when saving restore requested file", e);
- throw e;
+ } catch (Exception e) {
+ throw new HoodieException("Unable to restore to instant: " +
savepointToRestoreTimestamp, e);
}
}
+
+ private Predicate<HoodieInstant> constructInstantFilter(String
completionTime) {
+ HoodieInstant instantToRestoreTo =
table.getMetaClient().getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.RESTORE_ACTION, savepointToRestoreTimestamp, completionTime);
+ Comparator<HoodieInstant> comparator =
RestoreInstantComparatorFactory.createComparator(table.getMetaClient());
+ return instant -> comparator.compare(instant, instantToRestoreTo) > 0;
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
index 4f82bad0e669..94c7ac33ce99 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
@@ -46,7 +46,6 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -131,9 +130,7 @@ public class RollbackHelper implements Serializable {
List<String> filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
if (!filesToBeDeleted.isEmpty()) {
List<HoodieRollbackStat> rollbackStats = deleteFiles(metaClient,
filesToBeDeleted, doDelete);
- List<Pair<String, HoodieRollbackStat>> partitionToRollbackStats = new
ArrayList<>();
- rollbackStats.forEach(entry ->
partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry)));
- return partitionToRollbackStats.stream();
+ return rollbackStats.stream().map(entry ->
Pair.of(entry.getPartitionPath(), entry));
} else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
HoodieLogFormat.Writer writer = null;
final StoragePath filePath;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
index 149831f573cc..60ce9b390c98 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
@@ -24,10 +24,13 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.rollback.RestoreInstantComparatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Comparator;
+
public class SavepointHelpers {
private static final Logger LOG =
LoggerFactory.getLogger(SavepointHelpers.class);
@@ -36,26 +39,29 @@ public class SavepointHelpers {
HoodieInstant savePoint =
table.getMetaClient().createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
boolean isSavepointPresent =
table.getCompletedSavepointTimeline().containsInstant(savePoint);
if (!isSavepointPresent) {
- LOG.warn("No savepoint present " + savepointTime);
+ LOG.warn("No savepoint present {}", savepointTime);
return;
}
table.getActiveTimeline().revertToInflight(savePoint);
table.getActiveTimeline().deleteInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.SAVEPOINT_ACTION,
savepointTime));
- LOG.info("Savepoint " + savepointTime + " deleted");
+ LOG.info("Savepoint {} deleted", savepointTime);
}
public static void validateSavepointRestore(HoodieTable table, String
savepointTime) {
// Make sure the restore was successful
table.getMetaClient().reloadActiveTimeline();
- Option<HoodieInstant> lastInstant = table.getActiveTimeline()
+ // Validate that the restore has returned the timeline to the anticipated
state
+ Comparator<HoodieInstant> instantComparator =
RestoreInstantComparatorFactory.createComparator(table.getMetaClient());
+ Option<HoodieInstant> lastInstant =
Option.fromJavaOptional(table.getActiveTimeline()
.getWriteTimeline()
.filterCompletedAndCompactionInstants()
- .lastInstant();
+ .getInstantsAsStream()
+ .max(instantComparator));
ValidationUtils.checkArgument(lastInstant.isPresent());
ValidationUtils.checkArgument(lastInstant.get().requestedTime().equals(savepointTime),
- savepointTime + " is not the last commit after restoring to savepoint,
last commit was "
+ () -> savepointTime + " is not the last commit after restoring to
savepoint, last commit was "
+ lastInstant.get().requestedTime());
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 164a7e621478..f08f1d580b3a 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -23,12 +23,14 @@ import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineFactory;
@@ -660,10 +662,15 @@ public class HoodieClientTestBase extends
HoodieSparkClientTestHarness {
* @param baseRecordsToUpdate The base records to update
*/
@SuppressWarnings("rawtypes, unchecked")
- protected void updateBatchWithoutCommit(String newCommitTime,
List<HoodieRecord> baseRecordsToUpdate) throws IOException {
+ protected void updateBatchWithoutCommit(String newCommitTime,
List<HoodieRecord> baseRecordsToUpdate, HoodieTableVersion tableVersion) throws
IOException {
HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.withRollbackUsingMarkers(true)
.withHeartbeatTolerableMisses(0)
+ .withAutoUpgradeVersion(false)
+ .withWriteTableVersion(tableVersion.versionCode())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+ .build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
index 0eaedda3492d..331db8713faa 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
@@ -27,9 +27,6 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
@@ -59,8 +56,6 @@ import static
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
*/
public abstract class BaseHoodieTimeline implements HoodieTimeline {
- private static final Logger LOG =
LoggerFactory.getLogger(BaseHoodieTimeline.class);
-
private static final long serialVersionUID = 1L;
private static final String HASHING_ALGORITHM = "SHA-256";
@@ -506,6 +501,11 @@ public abstract class BaseHoodieTimeline implements
HoodieTimeline {
.sorted(instantComparator.completionTimeOrderedComparator());
}
+ @Override
+ public Stream<HoodieInstant> getReverseOrderedInstantsByCompletionTime() {
+ return
getInstantsAsStream().sorted(instantComparator.completionTimeOrderedComparator().reversed());
+ }
+
@Override
public boolean isBeforeTimelineStarts(String instant) {
Option<HoodieInstant> firstNonSavepointCommit =
getFirstNonSavepointCommit();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
index 1b4b99146e69..0678b60aba8a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
@@ -56,11 +56,11 @@ public interface CompletionTimeQueryView extends
AutoCloseable {
/**
* Queries the completion time with given instant time.
*
- * @param instantTime The instant time.
+ * @param requestedTime The time the commit was requested.
*
* @return The completion time if the instant finished or empty if it is
still pending.
*/
- Option<String> getCompletionTime(String beginTime);
+ Option<String> getCompletionTime(String requestedTime);
/**
* Queries the instant times with given completion time range.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index c923f697cf16..3a1b35d00d15 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -606,13 +606,19 @@ public interface HoodieTimeline extends
HoodieInstantReader, Serializable {
*/
Stream<HoodieInstant> getReverseOrderedInstants();
+ /**
+ * Get the stream of instants in reverse order by completion time. Any
incomplete instants are returned as the first elements of the stream.
+ * @return a stream of sorted instants
+ */
+ Stream<HoodieInstant> getReverseOrderedInstantsByCompletionTime();
+
/**
* @return the latest completion time of the instants
*/
Option<String> getLatestCompletionTime();
/**
- * Get the stream of instants in order by completion timestamp of actions.
+ * Get the stream of completed instants in order by completion timestamp of
actions.
*/
Stream<HoodieInstant> getInstantsOrderedByCompletionTime();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/InstantComparator.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/InstantComparator.java
index bd1b761150ab..d15831a9fb80 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/InstantComparator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/InstantComparator.java
@@ -36,7 +36,7 @@ public interface InstantComparator extends Serializable {
Comparator<HoodieInstant> requestedTimeOrderedComparator();
/**
- * Returns comparaor that orders primarily based on completion time and
secondary ordering based on {@link #requestedTimeOrderedComparator()}.
+ * Returns comparator that orders primarily based on completion time and
secondary ordering based on {@link #requestedTimeOrderedComparator()}.
* @return
*/
Comparator<HoodieInstant> completionTimeOrderedComparator();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/common/InstantComparators.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/common/InstantComparators.java
index 518b5bc93821..e1b42eeecf16 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/common/InstantComparators.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/common/InstantComparators.java
@@ -79,6 +79,15 @@ public class InstantComparators {
@Override
public int compare(HoodieInstant instant1, HoodieInstant instant2) {
+ if (instant1.getCompletionTime() == null && instant2.getCompletionTime()
!= null) {
+ return 1; // instant1 is not completed, so it is greater than instant2
+ }
+ if (instant2.getCompletionTime() == null && instant1.getCompletionTime()
!= null) {
+ return -1; // instant2 is not completed, so it is greater than instant1
+ }
+ if (instant1.getCompletionTime() == null && instant2.getCompletionTime()
== null) {
+ return timestampBasedComparator.compare(instant1, instant2); // both
are not completed, compare by requested time
+ }
int res =
instant1.getCompletionTime().compareTo(instant2.getCompletionTime());
if (res == 0) {
res = timestampBasedComparator.compare(instant1, instant2);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestInstantComparators.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestInstantComparators.java
new file mode 100644
index 000000000000..f8b51f79ff69
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestInstantComparators.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import
org.apache.hudi.common.table.timeline.versioning.common.InstantComparators;
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestInstantComparators {
+ @Test
+ void testCompletionTimeOrdering() {
+ HoodieInstant instant1 = createCompletedHoodieInstant("001", "002");
+ HoodieInstant instant2 = createCompletedHoodieInstant("003", "005");
+ HoodieInstant instant3 = createCompletedHoodieInstant("002", "004");
+ HoodieInstant instant4 = createInflightHoodieInstant("004");
+ HoodieInstant instant5 = createInflightHoodieInstant("009");
+
+ Comparator<HoodieInstant> comparator = new
InstantComparators.CompletionTimeBasedComparator(Collections.singletonMap(HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.COMMIT_ACTION));
+ List<HoodieInstant> instants = Arrays.asList(instant5, instant3, instant1,
instant4, instant2);
+ instants.sort(comparator);
+ assertEquals(Arrays.asList(instant1, instant3, instant2, instant4,
instant5), instants);
+ }
+
+ private static HoodieInstant createCompletedHoodieInstant(String
requestedTime, String completionTime) {
+ return new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, requestedTime, completionTime,
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+ }
+
+ private static HoodieInstant createInflightHoodieInstant(String
requestedTime) {
+ return new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, requestedTime,
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 79b32c2b1544..6bd86f60a95f 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -243,7 +243,7 @@ public class HoodieTestUtils {
String keyGen =
properties.getProperty("hoodie.datasource.write.keygenerator.class");
if (!Objects.equals(keyGen,
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
&&
!properties.containsKey("hoodie.datasource.write.partitionpath.field")) {
- builder.setPartitionFields("some_nonexistent_field");
+ builder.setPartitionFields("partition_path");
}
builder.fromProperties(properties);
return builder;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
index 744c331b65f8..6807f6cbcbb2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
@@ -229,7 +229,7 @@ public class TestHoodieFileSystemViews extends
HoodieClientTestBase {
}
}
- private void assertForFSVEquality(HoodieTableFileSystemView fsv1,
HoodieTableFileSystemView fsv2, boolean enableMdt, Option<HoodieCommitMetadata>
commitMetadataOpt) {
+ public static void assertForFSVEquality(HoodieTableFileSystemView fsv1,
HoodieTableFileSystemView fsv2, boolean enableMdt, Option<HoodieCommitMetadata>
commitMetadataOpt) {
List<String> allPartitionNames =
Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH,
DEFAULT_THIRD_PARTITION_PATH);
fsv1.loadPartitions(allPartitionNames);
if (enableMdt) {
@@ -255,7 +255,7 @@ public class TestHoodieFileSystemViews extends
HoodieClientTestBase {
});
}
- private void assertBaseFileListEquality(List<HoodieBaseFile> baseFileList1,
List<HoodieBaseFile> baseFileList2) {
+ static void assertBaseFileListEquality(List<HoodieBaseFile> baseFileList1,
List<HoodieBaseFile> baseFileList2) {
assertEquals(baseFileList1.size(), baseFileList2.size());
Map<String, HoodieBaseFile> fileNameToBaseFileMap1 = new HashMap<>();
baseFileList1.forEach(entry -> {
@@ -271,14 +271,14 @@ public class TestHoodieFileSystemViews extends
HoodieClientTestBase {
});
}
- public void assertBaseFileEquality(HoodieBaseFile baseFile1, HoodieBaseFile
baseFile2) {
+ static void assertBaseFileEquality(HoodieBaseFile baseFile1, HoodieBaseFile
baseFile2) {
assertEquals(baseFile1.getFileName(), baseFile2.getFileName());
assertEquals(baseFile1.getFileId(), baseFile2.getFileId());
assertEquals(baseFile1.getFileLen(), baseFile2.getFileLen());
assertEquals(baseFile1.getFileSize(), baseFile2.getFileSize());
}
- private void assertFileSliceListEquality(List<FileSlice> fileSlices1,
List<FileSlice> fileSlices2, Option<HoodieCommitMetadata> commitMetadataOpt) {
+ static void assertFileSliceListEquality(List<FileSlice> fileSlices1,
List<FileSlice> fileSlices2, Option<HoodieCommitMetadata> commitMetadataOpt) {
assertEquals(fileSlices1.size(), fileSlices1.size());
Map<Pair<String, String>, FileSlice> fileNameToFileSliceMap1 = new
HashMap<>();
fileSlices1.forEach(entry -> {
@@ -294,7 +294,7 @@ public class TestHoodieFileSystemViews extends
HoodieClientTestBase {
});
}
- private void assertFileSliceEquality(FileSlice fileSlice1, FileSlice
fileSlice2, Option<HoodieCommitMetadata> commitMetadataOpt) {
+ static void assertFileSliceEquality(FileSlice fileSlice1, FileSlice
fileSlice2, Option<HoodieCommitMetadata> commitMetadataOpt) {
assertEquals(fileSlice1.getBaseFile().isPresent(),
fileSlice2.getBaseFile().isPresent());
if (fileSlice1.getBaseFile().isPresent()) {
assertBaseFileEquality(fileSlice1.getBaseFile().get(),
fileSlice2.getBaseFile().get());
@@ -324,7 +324,7 @@ public class TestHoodieFileSystemViews extends
HoodieClientTestBase {
}
}
- private void assertLogFileEquality(HoodieLogFile logFile1, HoodieLogFile
logFile2) {
+ static void assertLogFileEquality(HoodieLogFile logFile1, HoodieLogFile
logFile2) {
assertEquals(logFile1.getFileName(), logFile2.getFileName());
assertEquals(logFile1.getFileId(), logFile2.getFileId());
assertEquals(logFile1.getLogVersion(), logFile2.getLogVersion());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
index 7e7a1d537dda..24ff620c8d08 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
@@ -21,30 +21,54 @@ package org.apache.hudi.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static
org.apache.hudi.functional.TestHoodieFileSystemViews.assertForFSVEquality;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -52,6 +76,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
@Tag("functional")
public class TestSavepointRestoreMergeOnRead extends HoodieClientTestBase {
+ // Overrides setup to avoid creating a table with the default table version
+ @BeforeEach
+ public void setUp() throws Exception {
+ initPath();
+ initSparkContexts();
+ initTestDataGenerator();
+ initHoodieStorage();
+ initTimelineService();
+ }
/**
* Actions: DC1, DC2, DC3, savepoint DC3,(snapshot query) DC4, C5, DC6, DC7.
restore to DC3.
@@ -69,15 +102,13 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
* snapshot query: total rec matches.
* checking the row count by updating columns in (val4,val5,val6, val7).
*/
- @Test
- void testCleaningDeltaCommits() throws Exception {
- HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit
triggers compaction
- .withInlineCompaction(false)
- .build())
- .withRollbackUsingMarkers(true)
- .build();
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+ void testCleaningDeltaCommits(HoodieTableVersion tableVersion) throws
Exception {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit
triggers compaction
+ .withInlineCompaction(false).build(), tableVersion);
+ Map<String, Integer> commitToRowCount = new HashMap<>();
try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
String savepointCommit = null;
final int numRecords = 10;
@@ -88,6 +119,7 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
client.commit(newCommitTime, client.insert(writeRecords,
newCommitTime));
+ commitToRowCount.put(newCommitTime, numRecords);
if (i == 3) {
// trigger savepoint
savepointCommit = newCommitTime;
@@ -127,17 +159,53 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
filter).size());
}
}
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(commitToRowCount, getRecordCountPerCommit());
}
- @Test
- public void testRestoreWithFileGroupCreatedWithDeltaCommits() throws
IOException {
- HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withMaxNumDeltaCommitsBeforeCompaction(4)
- .withInlineCompaction(true)
- .build())
- .withRollbackUsingMarkers(true)
- .build();
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+ public void testRestoreToWithInflightDeltaCommit(HoodieTableVersion
tableVersion) throws IOException {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .withInlineCompaction(false)
+ .compactionSmallFileSize(0).build(), tableVersion);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ // establish base files
+ String firstCommitTime = client.startCommit();
+ int numRecords = 10;
+ List<HoodieRecord> initialRecords =
dataGen.generateInserts(firstCommitTime, numRecords);
+ client.commit(firstCommitTime,
client.insert(jsc.parallelize(initialRecords, 1), firstCommitTime));
+ // add updates that go to log files
+ String secondCommitTime = client.startCommit();
+ List<HoodieRecord> updatedRecords =
dataGen.generateUniqueUpdates(secondCommitTime, numRecords);
+ client.commit(secondCommitTime,
client.upsert(jsc.parallelize(updatedRecords, 1), secondCommitTime));
+ // create a savepoint
+ client.savepoint("user1", "Savepoint for 2nd commit");
+ // add a third delta commit
+ String thirdCommitTime = client.startCommit();
+ List<HoodieRecord> updatedRecordsToBeRolledBack =
dataGen.generateUniqueUpdates(thirdCommitTime, numRecords);
+ client.commit(thirdCommitTime,
client.upsert(jsc.parallelize(updatedRecordsToBeRolledBack, 1),
thirdCommitTime));
+ // add a fourth delta commit but leave it in-flight
+ String fourthCommitTime = client.startCommit();
+ List<HoodieRecord> inFlightRecords =
Stream.concat(dataGen.generateUniqueUpdates(fourthCommitTime,
numRecords).stream(),
+ dataGen.generateInserts(fourthCommitTime,
numRecords).stream()).collect(Collectors.toList());
+ // collect result to trigger file creation
+ List<WriteStatus> writes =
client.upsert(jsc.parallelize(inFlightRecords, 1), fourthCommitTime).collect();
+ // restore to the savepoint
+ client.restoreToSavepoint(secondCommitTime);
+ // Validate metadata and records match expectations
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(Collections.singletonMap(secondCommitTime, numRecords),
getRecordCountPerCommit());
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+ public void
testRestoreWithFileGroupCreatedWithDeltaCommits(HoodieTableVersion
tableVersion) throws IOException {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .withInlineCompaction(true).build(), tableVersion);
final int numRecords = 100;
String firstCommit;
String secondCommit;
@@ -178,6 +246,11 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
.compactionSmallFileSize(0)
.build())
.withRollbackUsingMarkers(true)
+ .withAutoUpgradeVersion(false)
+ .withWriteTableVersion(tableVersion.versionCode())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+ .build())
.build();
// add 2 more updates which will create log files.
@@ -214,6 +287,71 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
FSUtils.constructAbsolutePath(hoodieWriteConfig.getBasePath(),
pPath),
filter).size());
}
+ validateFilesMetadata(hoodieWriteConfig);
+ Map<String, Integer> actualCommitToRowCount = getRecordCountPerCommit();
+ assertEquals(Collections.singletonMap(firstCommit, numRecords),
actualCommitToRowCount);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+ public void testRestoreWithUpdatesToClusteredFileGroups(HoodieTableVersion
tableVersion) throws IOException {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigAndInitializeTable(
+
HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(4).withInlineCompaction(false).compactionSmallFileSize(0).build(),
+
HoodieClusteringConfig.newBuilder().withAsyncClusteringMaxCommits(2).build(),
tableVersion);
+ final int numRecords = 20;
+ String secondCommit;
+ String clusteringCommit;
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ // 1st commit insert
+ String newCommitTime = client.startCommit();
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.commit(newCommitTime, client.insert(writeRecords, newCommitTime));
+
+ // 2nd commit with inserts and updates which will create new file slice
due to small file handling.
+ newCommitTime = client.startCommit();
+ List<HoodieRecord> records2 =
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(records2, 1);
+ List<HoodieRecord> records3 = dataGen.generateInserts(newCommitTime, 30);
+ JavaRDD<HoodieRecord> writeRecords3 = jsc.parallelize(records3, 1);
+
+ client.commit(newCommitTime,
client.upsert(writeRecords2.union(writeRecords3), newCommitTime));
+ secondCommit = newCommitTime;
+ // add savepoint to 2nd commit
+ client.savepoint(secondCommit, "test user","test comment");
+
+ Option<String> clusteringInstant =
client.scheduleClustering(Option.empty());
+ clusteringCommit = clusteringInstant.get();
+ client.cluster(clusteringCommit);
+
+ // add new log files on top of the clustered file group
+ newCommitTime = client.startCommit();
+ List<HoodieRecord> updatedRecords =
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ writeClient.commit(newCommitTime,
writeClient.upsert(jsc.parallelize(updatedRecords, 1), newCommitTime));
+ }
+ assertRowNumberEqualsTo(50);
+ StoragePathFilter filter = path ->
path.toString().contains(clusteringCommit);
+ for (String pPath : dataGen.getPartitionPaths()) {
+ // verify there is 1 base file and 1 log file created matching the
clustering timestamp if table version is 6.
+ // For table version 8 and above, the log file will not reference the
clustering commit.
+ assertEquals(tableVersion == HoodieTableVersion.SIX ? 2 : 1,
storage.listDirectEntries(
+ FSUtils.constructAbsolutePath(hoodieWriteConfig.getBasePath(),
pPath), filter)
+ .size());
+ }
+
+ // restore to 2nd commit and remove the clustering instant and delta
commit that followed it
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ client.restoreToSavepoint(secondCommit);
+ }
+ // verify that entire file slice created w/ base instant time of
clustering instant is completely rolled back.
+ for (String pPath : dataGen.getPartitionPaths()) {
+ assertEquals(0, storage.listDirectEntries(
+ FSUtils.constructAbsolutePath(hoodieWriteConfig.getBasePath(),
pPath), filter)
+ .size());
+ }
+ validateFilesMetadata(hoodieWriteConfig);
+ Map<String, Integer> actualCommitToRowCount = getRecordCountPerCommit();
+ assertEquals(Collections.singletonMap(secondCommit, 50),
actualCommitToRowCount);
}
/**
@@ -223,17 +361,15 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
* <P>Expected behaviors: pending compaction after savepoint should also be
cleaned,
* the latest file slice should be fully delete, for DC4 a rollback log
append should be made.
*/
- @Test
- void testCleaningPendingCompaction() throws Exception {
- HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit
triggers compaction
- .withInlineCompaction(false)
- .withScheduleInlineCompaction(false)
- .compactionSmallFileSize(0)
- .build())
- .withRollbackUsingMarkers(true)
- .build();
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+ void testCleaningPendingCompaction(HoodieTableVersion tableVersion) throws
Exception {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit
triggers compaction
+ .withInlineCompaction(false)
+ .withScheduleInlineCompaction(false)
+ .compactionSmallFileSize(0).build(), tableVersion);
+ Map<String, Integer> commitToRowCount = new HashMap<>();
try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
String savepointCommit = null;
final int numRecords = 10;
@@ -245,6 +381,7 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> writeStatusJavaRDD = client.insert(writeRecords,
newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD, Option.empty(),
DELTA_COMMIT_ACTION, Collections.emptyMap());
+ commitToRowCount.put(newCommitTime, numRecords);
if (i == 3) {
// trigger savepoint
savepointCommit = newCommitTime;
@@ -261,13 +398,15 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
if (i == 1) {
Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
assertTrue(compactionInstant.isPresent(), "A compaction plan should
be scheduled");
- compactWithoutCommit(compactionInstant.get());
+ compactWithoutCommit(compactionInstant.get(), tableVersion);
}
}
// restore
client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
- assertRowNumberEqualsTo(30);
+ assertEquals(commitToRowCount, getRecordCountPerCommit());
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(tableVersion,
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
}
}
@@ -277,16 +416,14 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
* <P>Expected behaviors: should roll back DC5, C6 and DC6.
* No files will be cleaned up. Only rollback log appends.
*/
- @Test
- void testCleaningCompletedRollback() throws Exception {
- HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withMaxNumDeltaCommitsBeforeCompaction(3) // the 3rd delta_commit
triggers compaction
- .withInlineCompaction(false)
- .withScheduleInlineCompaction(false)
- .build())
- .withRollbackUsingMarkers(true)
- .build();
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "NINE"})
+ void testCleaningCompletedRollback(HoodieTableVersion tableVersion) throws
Exception {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(3) // the 3rd delta_commit
triggers compaction
+ .withInlineCompaction(false)
+ .withScheduleInlineCompaction(false).build(), tableVersion);
+ Map<String, Integer> commitToRowCount = new HashMap<>();
try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
String savepointCommit = null;
final int numRecords = 10;
@@ -300,11 +437,14 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
client.commit(newCommitTime, writeStatusJavaRDD, Option.empty(),
DELTA_COMMIT_ACTION, Collections.emptyMap());
if (i == 2) {
baseRecordsToUpdate = records;
+ } else {
+ commitToRowCount.put(newCommitTime, numRecords);
}
}
// update to generate log files, then a valid compaction plan can be
scheduled
- upsertBatch(client, baseRecordsToUpdate);
+ String updateCommitTime = upsertBatch(client, baseRecordsToUpdate);
+ commitToRowCount.put(updateCommitTime, baseRecordsToUpdate.size());
Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
assertTrue(compactionInstant.isPresent(), "A compaction plan should be
scheduled");
HoodieWriteMetadata result = client.compact(compactionInstant.get());
@@ -316,7 +456,7 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
assertRowNumberEqualsTo(20);
// write a delta_commit but does not commit
updateBatchWithoutCommit(WriteClientTestUtils.createNewInstantTime(),
- Objects.requireNonNull(baseRecordsToUpdate, "The records to update
should not be null"));
+ Objects.requireNonNull(baseRecordsToUpdate, "The records to update
should not be null"), tableVersion);
// rollback the delta_commit
metaClient = HoodieTableMetaClient.reload(metaClient);
assertTrue(client.rollbackFailedWrites(metaClient), "The last
delta_commit should be rolled back");
@@ -326,21 +466,239 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
// restore
client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
- assertRowNumberEqualsTo(20);
+ assertEquals(commitToRowCount, getRecordCountPerCommit());
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(tableVersion,
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
+ }
+ }
+
+ @Test
+ void rollbackWithAsyncServices_compactionCompletesDuringCommit() {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion.EIGHT);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ final int numRecords = 10;
+ writeInitialCommitsForAsyncServicesTests(numRecords);
+ String inflightCommit = client.startCommit();
+ List<HoodieRecord> records =
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+ JavaRDD<WriteStatus> writeStatus =
writeClient.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+ // Run compaction while delta-commit is in-flight
+ Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
+ HoodieWriteMetadata result = client.compact(compactionInstant.get());
+ client.commitCompaction(compactionInstant.get(), result, Option.empty());
+ // commit the inflight delta commit
+ client.commit(inflightCommit, writeStatus);
+
+ client.savepoint(inflightCommit, "user1", "Savepoint for commit that
completed after compaction");
+
+ // write one more commit
+ String newCommitTime = client.startCommit();
+ records = dataGen.generateInserts(newCommitTime, numRecords);
+ writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+ client.commit(newCommitTime, writeStatus);
+
+ // restore to savepoint
+ client.restoreToSavepoint(inflightCommit);
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(Collections.singletonMap(inflightCommit, numRecords),
getRecordCountPerCommit());
+ // ensure the compaction instant is still present because it was
completed before the target of the restore
+
assertFalse(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+ .anyMatch(hoodieInstant ->
hoodieInstant.requestedTime().equals(compactionInstant.get())));
}
}
- private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord>
baseRecordsToUpdate) throws IOException {
+ @Test
+ void rollbackWithAsyncServices_commitCompletesDuringCompaction() {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion.EIGHT);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ final int numRecords = 10;
+ writeInitialCommitsForAsyncServicesTests(numRecords);
+ String inflightCommit = client.startCommit();
+ List<HoodieRecord> records =
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+ JavaRDD<WriteStatus> writeStatus =
client.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+ Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
+ HoodieWriteMetadata result = client.compact(compactionInstant.get());
+ // commit the inflight delta commit
+ client.commit(inflightCommit, writeStatus);
+ // commit the compaction instant after the delta commit
+ client.commitCompaction(compactionInstant.get(), result, Option.empty());
+
+ client.savepoint(inflightCommit, "user1", "Savepoint for commit that
completed during compaction");
+
+ // write one more commit
+ String newCommitTime = writeClient.startCommit();
+ records = dataGen.generateInserts(newCommitTime, numRecords);
+ writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+ client.commit(newCommitTime, writeStatus);
+
+ // restore to savepoint
+ client.restoreToSavepoint(inflightCommit);
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(Collections.singletonMap(inflightCommit, numRecords),
getRecordCountPerCommit());
+ // ensure the compaction instant is not present because it was completed
after the target of the restore
+
assertFalse(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+ .anyMatch(hoodieInstant ->
hoodieInstant.requestedTime().equals(compactionInstant.get())));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "NINE"})
+ void
rollbackWithAsyncServices_commitStartsAndFinishesDuringCompaction(HoodieTableVersion
tableVersion) {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigWithCompactionAndConcurrencyControl(tableVersion);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ final int numRecords = 10;
+ writeInitialCommitsForAsyncServicesTests(numRecords);
+ // Schedule a compaction
+ Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
+ // Start a delta commit
+ String inflightCommit = client.startCommit();
+ List<HoodieRecord> records =
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+ JavaRDD<WriteStatus> writeStatus =
client.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+ HoodieWriteMetadata result = client.compact(compactionInstant.get());
+ // commit the inflight delta commit
+ client.commit(inflightCommit, writeStatus);
+ // commit the compaction instant after the delta commit
+ client.commitCompaction(compactionInstant.get(), result, Option.empty());
+
+ client.savepoint(inflightCommit, "user1", "Savepoint for commit that
completed during compaction");
+
+ // write one more commit
+ String newCommitTime = writeClient.startCommit();
+ records = dataGen.generateInserts(newCommitTime, numRecords);
+ writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+ client.commit(newCommitTime, writeStatus);
+
+ // restore to savepoint
+ client.restoreToSavepoint(inflightCommit);
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(Collections.singletonMap(inflightCommit, numRecords),
getRecordCountPerCommit());
+
assertTrue(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+ .anyMatch(hoodieInstant ->
hoodieInstant.requestedTime().equals(compactionInstant.get())));
+ assertEquals(tableVersion,
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "NINE"})
+ void testMissingFileDoesNotFallRestore(HoodieTableVersion tableVersion)
throws Exception {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .withInlineCompaction(false)
+ .compactionSmallFileSize(0).build(), tableVersion);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ // establish base files
+ String firstCommitTime = client.startCommit();
+ int numRecords = 10;
+ List<HoodieRecord> initialRecords =
dataGen.generateInserts(firstCommitTime, numRecords);
+ client.commit(firstCommitTime,
client.insert(jsc.parallelize(initialRecords, 1), firstCommitTime));
+ // add updates that go to log files
+ String secondCommitTime = client.startCommit();
+ List<HoodieRecord> updatedRecords =
dataGen.generateUniqueUpdates(secondCommitTime, numRecords);
+ client.commit(secondCommitTime,
client.upsert(jsc.parallelize(updatedRecords, 1), secondCommitTime));
+ client.savepoint(secondCommitTime, "user1", "Savepoint for commit that
completed during compaction");
+
+ // add a third delta commit with log and new base files
+ String thirdCommitTime = client.startCommit();
+ List<HoodieRecord> upsertRecords =
Stream.concat(dataGen.generateUniqueUpdates(thirdCommitTime,
numRecords).stream(),
+ dataGen.generateInserts(thirdCommitTime,
numRecords).stream()).collect(Collectors.toList());
+ List<WriteStatus> writeStatuses =
client.upsert(jsc.parallelize(upsertRecords, 1), thirdCommitTime).collect();
+ client.commit(thirdCommitTime, jsc.parallelize(writeStatuses, 1));
+
+ // delete one base file and one log file to validate both cases are
handled gracefully
+ boolean deletedLogFile = false;
+ boolean deletedBaseFile = false;
+ for (WriteStatus writeStatus : writeStatuses) {
+ StoragePath path = FSUtils.constructAbsolutePath(basePath,
writeStatus.getStat().getPath());
+ if (deletedLogFile && deletedBaseFile) {
+ break;
+ }
+ if (FSUtils.isLogFile(path)) {
+ deletedLogFile = true;
+ storage.deleteFile(path);
+ } else {
+ deletedBaseFile = true;
+ storage.deleteFile(path);
+ }
+ }
+ client.restoreToSavepoint(secondCommitTime);
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(Collections.singletonMap(secondCommitTime, numRecords),
getRecordCountPerCommit());
+ }
+ }
+
+ private Map<String, Integer> writeInitialCommitsForAsyncServicesTests(int
numRecords) {
+ Map<String, Integer> commitToRowCount = new HashMap<>();
+ for (int i = 0; i < 3; i++) {
+ String newCommitTime = writeClient.startCommit();
+ List<HoodieRecord> records = i == 0 ?
dataGen.generateInserts(newCommitTime, numRecords) :
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ JavaRDD<WriteStatus> writeStatus = i == 0 ?
writeClient.insert(jsc.parallelize(records, 1), newCommitTime) :
writeClient.upsert(jsc.parallelize(records, 1), newCommitTime);
+ writeClient.commit(newCommitTime, writeStatus);
+ if (i == 2) {
+ commitToRowCount.put(newCommitTime, numRecords);
+ }
+ }
+ return commitToRowCount;
+ }
+
+ private HoodieWriteConfig
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion
tableVersion) {
+ HoodieWriteConfig config =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(2)
+ .withInlineCompaction(false)
+ .withScheduleInlineCompaction(false)
+ .build())
+ .withRollbackUsingMarkers(true)
+ .withAutoUpgradeVersion(false)
+ .withWriteTableVersion(tableVersion.versionCode())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+ .build())
+
.withProps(Collections.singletonMap(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(),
"0"))
+ .build();
+ try {
+ initMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
+ return config;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to initialize HoodieTableMetaClient",
e);
+ }
+ }
+
+ private void validateFilesMetadata(HoodieWriteConfig writeConfig) {
+ HoodieTableFileSystemView fileListingBasedView =
FileSystemViewManager.createInMemoryFileSystemView(context, metaClient,
+ HoodieMetadataConfig.newBuilder().enable(false).build());
+ FileSystemViewStorageConfig viewStorageConfig =
FileSystemViewStorageConfig.newBuilder().fromProperties(writeConfig.getProps())
+ .withStorageType(FileSystemViewStorageType.MEMORY).build();
+ HoodieTableFileSystemView metadataBasedView = (HoodieTableFileSystemView)
FileSystemViewManager
+ .createViewManager(context, writeConfig.getMetadataConfig(),
viewStorageConfig, writeConfig.getCommonConfig(),
+ (SerializableFunctionUnchecked<HoodieTableMetaClient,
HoodieTableMetadata>) v1 ->
+
metaClient.getTableFormat().getMetadataFactory().create(context,
metaClient.getStorage(), writeConfig.getMetadataConfig(),
writeConfig.getBasePath()))
+ .getFileSystemView(basePath);
+ assertForFSVEquality(fileListingBasedView, metadataBasedView, true,
Option.empty());
+ }
+
+ private String upsertBatch(SparkRDDWriteClient client, List<HoodieRecord>
baseRecordsToUpdate) throws IOException {
String newCommitTime = client.startCommit();
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime,
Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not
be null"));
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
client.commit(newCommitTime, client.upsert(writeRecords, newCommitTime),
Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
+ return newCommitTime;
}
- private void compactWithoutCommit(String compactionInstantTime) {
+ private void compactWithoutCommit(String compactionInstantTime,
HoodieTableVersion tableVersion) {
HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.withRollbackUsingMarkers(true)
.withEmbeddedTimelineServerEnabled(false)
+ .withAutoUpgradeVersion(false)
+ .withWriteTableVersion(tableVersion.versionCode())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+ .build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
@@ -353,4 +711,35 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
}
+
+ private HoodieWriteConfig
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig compactionConfig,
HoodieTableVersion tableVersion) throws IOException {
+ return getHoodieWriteConfigAndInitializeTable(compactionConfig,
HoodieClusteringConfig.newBuilder().build(), tableVersion);
+ }
+
+ private HoodieWriteConfig
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig compactionConfig,
HoodieClusteringConfig clusteringConfig, HoodieTableVersion tableVersion)
+ throws IOException {
+ HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+ .withCompactionConfig(compactionConfig)
+ .withClusteringConfig(clusteringConfig)
+ .withRollbackUsingMarkers(true)
+ .withAutoUpgradeVersion(false)
+ .withWriteTableVersion(tableVersion.versionCode())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+ .build())
+ .build();
+ initMetaClient(HoodieTableType.MERGE_ON_READ,
hoodieWriteConfig.getProps());
+ return hoodieWriteConfig;
+ }
+
+ private Map<String, Integer> getRecordCountPerCommit() {
+ Dataset<Row> dataset = sparkSession.read().format("hudi").load(basePath);
+ List<Row> rows = dataset.collectAsList();
+ int index =
dataset.schema().fieldIndex(HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName());
+ Map<String, Integer> actualCommitToRowCount = new HashMap<>();
+ rows.forEach(row -> {
+ actualCommitToRowCount.compute(row.getString(index), (k, v) -> (v ==
null) ? 1 : v + 1);
+ });
+ return actualCommitToRowCount;
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 9e2325593099..61df5fc76d17 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -20,6 +20,8 @@ package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.common.HoodieSparkEngineContext
import
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model._
@@ -35,7 +37,7 @@ import org.apache.hudi.util.JavaConversions
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit
import org.junit.jupiter.api._
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource,
MethodSource}
import org.junit.jupiter.params.provider.Arguments.arguments
@@ -365,6 +367,9 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
{
val lastCleanInstant =
getLatestMetaClient(false).getActiveTimeline.getCleanerTimeline.lastInstant()
assertTrue(lastCleanInstant.isPresent)
+ val writeConfig = getWriteConfig(hudiOpts)
+ val client = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc),
writeConfig)
+ client.savepoint("user", "note")
doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
@@ -372,11 +377,10 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase {
assertTrue(getLatestMetaClient(false).getActiveTimeline.getCleanerTimeline.lastInstant().get().requestedTime
.compareTo(lastCleanInstant.get().requestedTime) > 0)
- var rollbackedInstant: Option[HoodieInstant] = Option.empty
- while (rollbackedInstant.isEmpty || rollbackedInstant.get.getAction !=
ActionType.clean.name()) {
- // rollback clean instant
- rollbackedInstant = Option.apply(rollbackLastInstant(hudiOpts))
- }
+ client.restoreToSavepoint()
+ client.close()
+ // last commit is no longer present so remove it from the mergedDfList
+ mergedDfList = mergedDfList.take(mergedDfList.size - 1)
validateDataAndRecordIndices(hudiOpts)
}
@@ -574,12 +578,28 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase {
val function = () => doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
- executeFunctionNTimes(function, 5)
+ executeFunctionNTimes(function, 3)
+ // create a savepoint on the data table before the metadata table clean
operation
+
assertFalse(getMetadataMetaClient(hudiOpts).getActiveTimeline.getCleanerTimeline.lastInstant().isPresent)
+ val writeConfig = getWriteConfig(hudiOpts)
+ val client = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc),
writeConfig)
+ client.savepoint("user", "note")
+
+ // validate that the clean is present in the metadata table timeline
+ var iterations = 0
+ while
(getMetadataMetaClient(hudiOpts).getActiveTimeline.getCleanerTimeline.lastInstant().isEmpty)
{
+ doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+ iterations += 1
+ }
assertTrue(getMetadataMetaClient(hudiOpts).getActiveTimeline.getCleanerTimeline.lastInstant().isPresent)
- rollbackLastInstant(hudiOpts)
- // Rolling back clean instant from MDT
- rollbackLastInstant(hudiOpts)
+ // restore to the savepoint to force the metadata table state to roll back
to before the clean
+ client.restoreToSavepoint()
+ client.close()
+ // remove the commits that were created after the savepoint
+ mergedDfList = mergedDfList.take(mergedDfList.size - iterations)
validateDataAndRecordIndices(hudiOpts)
}