This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 77786d28627 [HUDI-6153] Changed the rollback mechanism for MDT to
actual rollbacks rather than appending revert blocks. (#8837)
77786d28627 is described below
commit 77786d28627cafc2982e20df89c2617d5c940811
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Jul 20 05:19:58 2023 -0700
[HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks
rather than appending revert blocks. (#8837)
Prior to this patch, rollbacks in data table (DT) is yet another delta
commit
in metadata table (MDT), and we mark all files as invalid in that delta
commit.
But as we are adding more high-scale partitions like record indec, we can't
afford to mark 100k entries as invalid. So, we wanted to trigger an actual
rollback of the commit of interest in MDT for the delta commit of interest
(similar to how we do rollback in DT).
---------
Co-authored-by: Sagar Sumit <[email protected]>
Co-authored-by: sivabalan <[email protected]>
---
.../hudi/cli/integ/ITTestSavepointsCommand.java | 20 +-
.../hudi/client/BaseHoodieTableServiceClient.java | 16 +-
.../apache/hudi/client/BaseHoodieWriteClient.java | 51 ++-
.../metadata/HoodieBackedTableMetadataWriter.java | 166 ++++++++--
.../hudi/metadata/HoodieMetadataWriteUtils.java | 2 +-
.../hudi/metadata/HoodieTableMetadataWriter.java | 6 -
.../hudi/table/upgrade/UpgradeDowngrade.java | 3 +-
.../common/testutils/HoodieMetadataTestTable.java | 6 +-
.../hudi/utils/TestMetadataConversionUtils.java | 4 +-
.../SparkHoodieBackedTableMetadataWriter.java | 9 +-
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 11 +
.../hudi/table/HoodieSparkMergeOnReadTable.java | 17 +
.../org/apache/hudi/client/TestClientRollback.java | 3 +-
.../functional/TestHoodieBackedMetadata.java | 71 ++--
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 87 ++---
.../TestHoodieSparkMergeOnReadTableRollback.java | 19 +-
.../hudi/table/upgrade/TestUpgradeDowngrade.java | 34 +-
.../hudi/common/config/HoodieMetadataConfig.java | 2 +-
.../table/timeline/HoodieDefaultTimeline.java | 6 +
.../hudi/common/table/timeline/HoodieTimeline.java | 9 +
.../hudi/metadata/HoodieTableMetadataUtil.java | 358 ++++++++++-----------
.../hudi/common/testutils/HoodieTestTable.java | 58 +++-
.../sink/TestStreamWriteOperatorCoordinator.java | 7 +-
.../org/apache/hudi/functional/TestBootstrap.java | 1 +
.../apache/hudi/utilities/TestHoodieIndexer.java | 3 +-
25 files changed, 601 insertions(+), 368 deletions(-)
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
index 9bc368e9522..7bf38338a5d 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
@@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@@ -100,9 +101,15 @@ public class ITTestSavepointsCommand extends
HoodieCLIIntegrationTestBase {
/**
* Test case of command 'savepoint rollback'.
*/
- @Test
+ @Disabled("HUDI-6571") // TODO: Fix this test. Probably need to fix
HoodieTestDataGenerator to create non-empty commit metadata.
public void testRollbackToSavepoint() throws IOException {
- // generate four savepoints
+ // disable metadata table.
+ Object result = shell.evaluate(() ->
+ String.format("metadata delete"));
+
+ assertTrue(ShellEvaluationResultUtil.isSuccess(result));
+
+ // generate four commits
for (int i = 100; i < 104; i++) {
String instantTime = String.valueOf(i);
HoodieTestDataGenerator.createCommitFile(tablePath, instantTime,
jsc.hadoopConfiguration());
@@ -112,13 +119,14 @@ public class ITTestSavepointsCommand extends
HoodieCLIIntegrationTestBase {
String savepoint = "102";
HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint,
jsc.hadoopConfiguration());
- Object result = shell.evaluate(() ->
+ result = shell.evaluate(() ->
String.format("savepoint rollback --savepoint %s --sparkMaster
%s", savepoint, "local"));
+ Object finalResult = result;
assertAll("Command run failed",
- () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(finalResult)),
() -> assertEquals(
- String.format("Savepoint \"%s\" rolled back", savepoint),
result.toString()));
+ String.format("Savepoint \"%s\" rolled back", savepoint),
finalResult.toString()));
// there is 1 restore instant
HoodieActiveTimeline timeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
@@ -132,7 +140,7 @@ public class ITTestSavepointsCommand extends
HoodieCLIIntegrationTestBase {
/**
* Test case of command 'savepoint rollback' with metadata table bootstrap.
*/
- @Test
+ @Disabled("HUDI-6571")
public void testRollbackToSavepointWithMetadataTableEnable() throws
IOException {
// generate for savepoints
for (int i = 101; i < 105; i++) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 5eb2a8f9ff3..05944e71711 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -861,8 +861,22 @@ public abstract class BaseHoodieTableServiceClient<O>
extends BaseHoodieClient i
*/
@Deprecated
public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking)
throws HoodieRollbackException {
- LOG.info("Begin rollback of instant " + commitInstantTime);
final String rollbackInstantTime = pendingRollbackInfo.map(entry ->
entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
+ return rollback(commitInstantTime, pendingRollbackInfo,
rollbackInstantTime, skipLocking);
+ }
+
+ /**
+ * @param commitInstantTime Instant time of the commit
+ * @param pendingRollbackInfo pending rollback instant and plan if rollback
failed from previous attempt.
+ * @param skipLocking if this is triggered by another parent
transaction, locking can be skipped.
+ * @throws HoodieRollbackException if rollback cannot be performed
successfully
+ * @Deprecated Rollback the inflight record changes with the given commit
time. This
+ * will be removed in future in favor of {@link
BaseHoodieWriteClient#restoreToInstant(String, boolean)
+ */
+ @Deprecated
+ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, String
rollbackInstantTime,
+ boolean skipLocking) throws HoodieRollbackException {
+ LOG.info("Begin rollback of instant " + commitInstantTime);
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, hadoopConf);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 48aae4ff135..050ad0070da 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -74,6 +74,7 @@ import
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metrics.HoodieMetrics;
@@ -107,7 +108,6 @@ import java.util.stream.Collectors;
import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
import static
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
/**
* Abstract Write Client providing functionality for performing commit, index
updates and rollback
@@ -690,36 +690,52 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* @param savepointTime Savepoint time to rollback to
*/
public void restoreToSavepoint(String savepointTime) {
- boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled();
- if (initialMetadataTableIfNecessary) {
+ boolean initializeMetadataTableIfNecessary =
config.isMetadataTableEnabled();
+ if (initializeMetadataTableIfNecessary) {
try {
- // Delete metadata table directly when users trigger savepoint
rollback if mdt existed and beforeTimelineStarts
+ // Delete metadata table directly when users trigger savepoint
rollback if mdt existed and if the savePointTime is beforeTimelineStarts
+ // or before the oldest compaction on MDT.
+ // We cannot restore to before the oldest compaction on MDT as we
don't have the basefiles before that time.
HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
.setConf(hadoopConf)
.setBasePath(getMetadataTableBasePath(config.getBasePath())).build();
- // Same as HoodieTableMetadataUtil#processRollbackMetadata
- HoodieInstant syncedInstant = new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
+ Option<HoodieInstant> oldestMdtCompaction =
mdtMetaClient.getCommitTimeline().filterCompletedInstants().firstInstant();
+ boolean deleteMDT = false;
+ if (oldestMdtCompaction.isPresent()) {
+ if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(savepointTime,
oldestMdtCompaction.get().getTimestamp())) {
+ LOG.warn(String.format("Deleting MDT during restore to %s as the
savepoint is older than oldest compaction %s on MDT",
+ savepointTime, oldestMdtCompaction.get().getTimestamp()));
+ deleteMDT = true;
+ }
+ }
+
// The instant required to sync rollback to MDT has been archived and
the mdt syncing will be failed
// So that we need to delete the whole MDT here.
- if
(mdtMetaClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp()))
{
- HoodieTableMetaClient dataMetaClient =
HoodieTableMetaClient.builder()
- .setConf(hadoopConf)
- .setBasePath(config.getBasePath()).build();
- deleteMetadataTable(dataMetaClient, getEngineContext(), false);
+ if (!deleteMDT) {
+ HoodieInstant syncedInstant = new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
+ if
(mdtMetaClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp()))
{
+ LOG.warn(String.format("Deleting MDT during restore to %s as the
savepoint is older than the MDT timeline %s",
+ savepointTime,
mdtMetaClient.getCommitsTimeline().firstInstant().get().getTimestamp()));
+ deleteMDT = true;
+ }
+ }
+
+ if (deleteMDT) {
+ HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(),
context);
// rollbackToSavepoint action will try to bootstrap MDT at first but
sync to MDT will fail at the current scenario.
// so that we need to disable metadata initialized here.
- initialMetadataTableIfNecessary = false;
+ initializeMetadataTableIfNecessary = false;
}
} catch (Exception e) {
// Metadata directory does not exist
}
}
- HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN,
Option.empty(), initialMetadataTableIfNecessary);
+ HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN,
Option.empty(), initializeMetadataTableIfNecessary);
SavepointHelpers.validateSavepointPresence(table, savepointTime);
ValidationUtils.checkArgument(!config.shouldArchiveBeyondSavepoint(),
"Restore is not supported when " +
HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT.key()
+ " is enabled");
- restoreToInstant(savepointTime, initialMetadataTableIfNecessary);
+ restoreToInstant(savepointTime, initializeMetadataTableIfNecessary);
SavepointHelpers.validateSavepointRestore(table, savepointTime);
}
@@ -730,6 +746,13 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo,
false);
}
+ @Deprecated
+ public boolean rollback(final String commitInstantTime, String
rollbackInstantTimestamp) throws HoodieRollbackException {
+ HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN,
Option.empty());
+ Option<HoodiePendingRollbackInfo> pendingRollbackInfo =
tableServiceClient.getPendingRollbackInfo(table.getMetaClient(),
commitInstantTime);
+ return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo,
rollbackInstantTimestamp, false);
+ }
+
/**
* NOTE : This action requires all writers (ingest and compact) to a table
to be stopped before proceeding. Revert
* the (inflight/committed) record changes for all commits after the
provided instant time.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index df73145a1bb..b63c6a5c649 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -37,6 +38,7 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
@@ -52,9 +54,12 @@ import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
@@ -90,6 +95,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -97,13 +103,13 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
-import static
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
import static
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.METADATA_INDEXER_TIME_SUFFIX;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.createRollbackTimestamp;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
@@ -433,7 +439,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
*/
private String generateUniqueCommitInstantTime(String initializationTime) {
// if its initialized via Async indexer, we don't need to alter the init
time
- if (initializationTime.length() == MILLIS_INSTANT_ID_LENGTH +
METADATA_INDEXER_TIME_SUFFIX.length()) {
+ if (HoodieTableMetadataUtil.isIndexingCommit(initializationTime)) {
return initializationTime;
}
// Add suffix to initializationTime to find an unused instant time for the
next index initialization.
@@ -859,7 +865,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient,
partitionTypes);
// initialize partitions
- initializeFromFilesystem(indexUptoInstantTime +
METADATA_INDEXER_TIME_SUFFIX, partitionTypes, Option.empty());
+
initializeFromFilesystem(HoodieTableMetadataUtil.createAsyncIndexerTimestamp(indexUptoInstantTime),
partitionTypes, Option.empty());
}
/**
@@ -906,10 +912,59 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
*/
@Override
public void update(HoodieRestoreMetadata restoreMetadata, String
instantTime) {
- processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
- metadataMetaClient.getActiveTimeline(), restoreMetadata,
getRecordsGenerationParams(), instantTime,
- metadata.getSyncedInstantTime()));
- closeInternal();
+ dataMetaClient.reloadActiveTimeline();
+
+ // Fetch the commit to restore to (savepointed commit time)
+ HoodieInstant restoreInstant = new
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION,
instantTime);
+ HoodieInstant requested =
HoodieTimeline.getRestoreRequestedInstant(restoreInstant);
+ HoodieRestorePlan restorePlan = null;
+ try {
+ restorePlan = TimelineMetadataUtils.deserializeAvroMetadata(
+
dataMetaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(),
HoodieRestorePlan.class);
+ } catch (IOException e) {
+ throw new HoodieIOException("Deserialization of restore plan failed
whose restore instant time is " + instantTime + " in data table", e);
+ }
+ final String restoreToInstantTime =
restorePlan.getSavepointToRestoreTimestamp();
+ LOG.info("Triggering restore to " + restoreToInstantTime + " in metadata
table");
+
+ // fetch the earliest commit to retain and ensure the base file prior to
the time to restore is present
+ List<HoodieFileGroup> filesGroups =
metadata.getMetadataFileSystemView().getAllFileGroups(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
+
+ boolean cannotRestore = filesGroups.stream().map(fileGroup ->
fileGroup.getAllFileSlices().map(fileSlice ->
fileSlice.getBaseInstantTime()).anyMatch(
+ instantTime1 -> HoodieTimeline.compareTimestamps(instantTime1,
LESSER_THAN_OR_EQUALS, restoreToInstantTime))).anyMatch(canRestore ->
!canRestore);
+ if (cannotRestore) {
+ throw new HoodieMetadataException(String.format("Can't restore to %s
since there is no base file in MDT lesser than the commit to restore to. "
+ + "Please delete metadata table and retry", restoreToInstantTime));
+ }
+
+ // Restore requires the existing pipelines to be shutdown. So we can
safely scan the dataset to find the current
+ // list of files in the filesystem.
+ List<DirectoryInfo> dirInfoList =
listAllPartitionsFromFilesystem(instantTime);
+ Map<String, DirectoryInfo> dirInfoMap =
dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath,
Function.identity()));
+ dirInfoList.clear();
+
+ getWriteClient().restoreToInstant(restoreToInstantTime, false);
+
+ // At this point we have also reverted the cleans which have occurred
after the restoreToInstantTime. Hence, a sync
+ // is required to bring back those cleans.
+ try {
+ initMetadataReader();
+ Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
+ Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+ List<String> partitionsToDelete = new ArrayList<>();
+ fetchOutofSyncFilesRecordsFromMetadataTable(dirInfoMap,
partitionFilesToAdd, partitionFilesToDelete, partitionsToDelete);
+
+ // Even if we don't have any deleted files to sync, we still create an
empty commit so that we can track the restore has completed.
+ // We cannot create a deltaCommit at instantTime now because a future
(rollback) block has already been written to the logFiles.
+ // We need to choose a timestamp which would be a validInstantTime for
MDT. This is either a commit timestamp completed on the dataset
+ // or a timestamp with suffix which we use for MDT clean, compaction etc.
+ String syncCommitTime =
HoodieTableMetadataUtil.createRestoreTimestamp(HoodieActiveTimeline.createNewInstantTime());
+ processAndCommit(syncCommitTime, () ->
HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
+ partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete,
syncCommitTime));
+ closeInternal();
+ } catch (IOException e) {
+ throw new HoodieMetadataException("IOException during MDT restore sync",
e);
+ }
}
/**
@@ -921,23 +976,47 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
@Override
public void update(HoodieRollbackMetadata rollbackMetadata, String
instantTime) {
if (initialized && metadata != null) {
- // Is this rollback of an instant that has been synced to the metadata
table?
- String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);
- boolean wasSynced =
metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, rollbackInstant));
- if (!wasSynced) {
- // A compaction may have taken place on metadata table which would
have included this instant being rolled back.
- // Revisit this logic to relax the compaction fencing :
https://issues.apache.org/jira/browse/HUDI-2458
- Option<String> latestCompaction = metadata.getLatestCompactionTime();
- if (latestCompaction.isPresent()) {
- wasSynced = HoodieTimeline.compareTimestamps(rollbackInstant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCompaction.get());
+ // The commit which is being rolled back on the dataset
+ final String commitToRollbackInstantTime =
rollbackMetadata.getCommitsRollback().get(0);
+ // Find the deltacommits since the last compaction
+ Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+
CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline());
+
+ // This could be a compaction or deltacommit instant (See
CompactionUtils.getDeltaCommitsSinceLatestCompaction)
+ HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue();
+ HoodieTimeline deltacommitsSinceCompaction =
deltaCommitsInfo.get().getKey();
+
+ // The deltacommit that will be rolled back
+ HoodieInstant deltaCommitInstant = new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);
+
+ // The commit being rolled back should not be earlier than the latest
compaction on the MDT. Compaction on MDT only occurs when all actions
+ // are completed on the dataset. Hence, this case implies a rollback of
completed commit which should actually be handled using restore.
+ if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
+ final String compactionInstantTime = compactionInstant.getTimestamp();
+ if
(HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime,
compactionInstantTime)) {
+ throw new HoodieMetadataException(String.format("Commit being rolled
back %s is earlier than the latest compaction %s. "
+ + "There are %d deltacommits after this compaction: %s",
commitToRollbackInstantTime, compactionInstantTime,
+ deltacommitsSinceCompaction.countInstants(),
deltacommitsSinceCompaction.getInstants()));
}
}
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> records =
- HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
metadataMetaClient.getActiveTimeline(),
- rollbackMetadata, getRecordsGenerationParams(), instantTime,
- metadata.getSyncedInstantTime(), wasSynced);
- commit(instantTime, records);
+ // lets apply a delta commit with DT's rb instant(with special suffix)
containing following records:
+ // a. any log files as part of RB commit metadata that was added
+ // b. log files added by the commit in DT being rolled back. By rolled
back, we mean, a rollback block will be added and does not mean it will be
deleted.
+ // both above list should only be added to FILES partition.
+
+ String rollbackInstantTime = createRollbackTimestamp(instantTime);
+ processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, dataMetaClient,
rollbackMetadata, instantTime));
+
+ if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) {
+ LOG.info("Rolling back MDT deltacommit " +
commitToRollbackInstantTime);
+ if (!getWriteClient().rollback(commitToRollbackInstantTime,
rollbackInstantTime)) {
+ throw new HoodieMetadataException("Failed to rollback deltacommit at
" + commitToRollbackInstantTime);
+ }
+ } else {
+ LOG.info(String.format("Ignoring rollback of instant %s at %s. The
commit to rollback is not found in MDT",
+ commitToRollbackInstantTime, instantTime));
+ }
closeInternal();
}
}
@@ -1164,6 +1243,47 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
return true;
}
+ private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String,
DirectoryInfo> dirInfoMap, Map<String, Map<String, Long>> partitionFilesToAdd,
+ Map<String,
List<String>> partitionFilesToDelete, List<String> partitionsToDelete) throws
IOException {
+
+ for (String partition : metadata.fetchAllPartitionPaths()) {
+ Path partitionPath = null;
+ if (StringUtils.isNullOrEmpty(partition) &&
!dataMetaClient.getTableConfig().isTablePartitioned()) {
+ partitionPath = new Path(dataWriteConfig.getBasePath());
+ } else {
+ partitionPath = new Path(dataWriteConfig.getBasePath(), partition);
+ }
+ final String partitionId =
HoodieTableMetadataUtil.getPartitionIdentifier(partition);
+ FileStatus[] metadataFiles =
metadata.getAllFilesInPartition(partitionPath);
+ if (!dirInfoMap.containsKey(partition)) {
+ // Entire partition has been deleted
+ partitionsToDelete.add(partitionId);
+ if (metadataFiles != null && metadataFiles.length > 0) {
+ partitionFilesToDelete.put(partitionId,
Arrays.stream(metadataFiles).map(f ->
f.getPath().getName()).collect(Collectors.toList()));
+ }
+ } else {
+ // Some files need to be cleaned and some to be added in the partition
+ Map<String, Long> fsFiles =
dirInfoMap.get(partition).getFileNameToSizeMap();
+ List<String> mdtFiles = Arrays.stream(metadataFiles).map(mdtFile ->
mdtFile.getPath().getName()).collect(Collectors.toList());
+ List<String> filesDeleted = Arrays.stream(metadataFiles).map(f ->
f.getPath().getName())
+ .filter(n -> !fsFiles.containsKey(n)).collect(Collectors.toList());
+ Map<String, Long> filesToAdd = new HashMap<>();
+ // new files could be added to DT due to restore that just happened
which may not be tracked in RestoreMetadata.
+ dirInfoMap.get(partition).getFileNameToSizeMap().forEach((k,v) -> {
+ if (!mdtFiles.contains(k)) {
+ filesToAdd.put(k,v);
+ }
+ });
+ if (!filesToAdd.isEmpty()) {
+ partitionFilesToAdd.put(partitionId, filesToAdd);
+ }
+ if (!filesDeleted.isEmpty()) {
+ partitionFilesToDelete.put(partitionId, filesDeleted);
+ }
+ }
+ }
+ }
+
/**
* Return records that represent update to the record index due to write
operation on the dataset.
*
@@ -1282,6 +1402,8 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
return initialized;
}
+ protected abstract BaseHoodieWriteClient getWriteClient();
+
/**
* A class which represents a directory and the files and directories inside
it.
* <p>
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index f431283ac7a..9d7ecc8be0f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -94,7 +94,7 @@ public class HoodieMetadataWriteUtils {
.withCleanerParallelism(parallelism)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
- .retainCommits(Math.min(writeConfig.getCleanerCommitsRetained(),
DEFAULT_METADATA_CLEANER_COMMITS_RETAINED))
+ .retainCommits(DEFAULT_METADATA_CLEANER_COMMITS_RETAINED)
.build())
// we will trigger archive manually, to ensure only regular writer
invokes it
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index b809a4999e4..395749657f9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -94,11 +93,6 @@ public interface HoodieTableMetadataWriter extends
Serializable, AutoCloseable {
*/
void deletePartitions(String instantTime, List<MetadataPartitionType>
partitions);
- /**
- * It returns write client for metadata table.
- */
- BaseHoodieWriteClient getWriteClient();
-
/**
* Returns true if the metadata table is initialized.
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index b7636a25e7c..a19e067aae1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -151,7 +151,8 @@ public class UpgradeDowngrade {
fromVersion = prevVersion;
}
}
-
+ // Reload the meta client to get the latest table config (which could have
been updated due to metadata table)
+ metaClient = HoodieTableMetaClient.reload(metaClient);
// Write out the current version in hoodie.properties.updated file
for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) {
metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index 7297759f21c..6e6d609c848 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -23,6 +23,7 @@ import
org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
@@ -129,11 +130,12 @@ public class HoodieMetadataTestTable extends
HoodieTestTable {
}
@Override
- public HoodieTestTable addRollback(String instantTime,
HoodieRollbackMetadata rollbackMetadata) throws IOException {
- super.addRollback(instantTime, rollbackMetadata);
+ public HoodieTestTable addRollback(String instantTime,
HoodieRollbackMetadata rollbackMetadata, HoodieRollbackPlan rollbackPlan)
throws IOException {
+ super.addRollback(instantTime, rollbackMetadata, rollbackPlan);
if (writer != null) {
writer.update(rollbackMetadata, instantTime);
}
+ super.addRollbackCompleted(instantTime, rollbackMetadata, false);
return this;
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
index 1bbe10db0f5..3938df3f3af 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
@@ -194,7 +194,9 @@ public class TestMetadataConversionUtils extends
HoodieCommonTestHarness {
rollbackMetadata.setPartitionMetadata(partitionMetadataMap);
rollbackMetadata.setInstantsRollback(Arrays.asList(new
HoodieInstantInfo("1", HoodieTimeline.COMMIT_ACTION)));
HoodieTestTable.of(metaClient)
- .addRollback(instantTime, rollbackMetadata);
+ .addRollback(instantTime, rollbackMetadata, null);
+ HoodieTestTable.of(metaClient)
+ .addRollbackCompleted(instantTime, rollbackMetadata, false);
}
private void createCommitMetadata(String instantTime) throws Exception {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 9a254409b8d..2346831162b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -37,8 +37,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metrics.DistributedRegistry;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,8 +123,8 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
}
protected void bulkCommit(
- String instantTime, MetadataPartitionType partitionType,
HoodieData<HoodieRecord> records,
- int fileGroupCount) {
+ String instantTime, MetadataPartitionType partitionType,
HoodieData<HoodieRecord> records,
+ int fileGroupCount) {
SparkHoodieMetadataBulkInsertPartitioner partitioner = new
SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount);
commitInternal(instantTime, Collections.singletonMap(partitionType,
records), Option.of(partitioner));
}
@@ -138,7 +139,7 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
try (SparkRDDWriteClient writeClient = (SparkRDDWriteClient)
getWriteClient()) {
// rollback partially failed writes if any.
if (dataWriteConfig.getFailedWritesCleanPolicy().isEager()
- && writeClient.rollbackFailedWrites()) {
+ && writeClient.rollbackFailedWrites()) {
metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 7cb37c898db..e9d21350c21 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -43,7 +43,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
@@ -88,6 +90,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
+
/**
* Implementation of a very heavily read-optimized Hoodie Table where, all
data is stored in base files, with
* zero read amplification.
@@ -197,6 +201,13 @@ public class HoodieSparkCopyOnWriteTable<T>
@Override
public void rollbackBootstrap(HoodieEngineContext context, String
instantTime) {
+ // Delete metadata table to rollback a failed bootstrap. re-attempt of
bootstrap will re-initialize the mdt.
+ try {
+ LOG.info("Deleting metadata table because we are rolling back failed
bootstrap. ");
+ deleteMetadataTable(config.getBasePath(), context);
+ } catch (HoodieMetadataException e) {
+ throw new HoodieException("Failed to delete metadata table.", e);
+ }
new RestorePlanActionExecutor<>(context, config, this, instantTime,
HoodieTimeline.INIT_INSTANT_TS).execute();
new CopyOnWriteRestoreActionExecutor<>(context, config, this, instantTime,
HoodieTimeline.INIT_INSTANT_TS).execute();
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index efe0a2cff53..f5929fdc667 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -36,7 +36,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.io.HoodieAppendHandle;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
@@ -57,11 +59,16 @@ import
org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
+
/**
* Implementation of a more real-time Hoodie Table the provides tradeoffs on
read and write cost/amplification.
*
@@ -80,6 +87,8 @@ import java.util.Map;
*/
public class HoodieSparkMergeOnReadTable<T> extends
HoodieSparkCopyOnWriteTable<T> implements HoodieCompactionHandler<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieSparkMergeOnReadTable.class);
+
HoodieSparkMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext
context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient);
}
@@ -168,6 +177,14 @@ public class HoodieSparkMergeOnReadTable<T> extends
HoodieSparkCopyOnWriteTable<
@Override
public void rollbackBootstrap(HoodieEngineContext context, String
instantTime) {
+ // Delete metadata table to rollback a failed bootstrap. re-attempt of
bootstrap will re-initialize the mdt.
+ try {
+ LOG.info("Deleting metadata table because we are rolling back failed
bootstrap. ");
+ deleteMetadataTable(config.getBasePath(), context);
+ } catch (HoodieMetadataException e) {
+ throw new HoodieException("Failed to delete metadata table.", e);
+ }
+
new RestorePlanActionExecutor<>(context, config, this, instantTime,
HoodieTimeline.INIT_INSTANT_TS).execute();
new MergeOnReadRestoreActionExecutor<>(context, config, this, instantTime,
HoodieTimeline.INIT_INSTANT_TS).execute();
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index 166f6daf74a..a8b6f77a6a6 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -228,7 +228,8 @@ public class TestClientRollback extends
HoodieClientTestBase {
@Test
public void testGetSavepointOldSchema() throws Exception {
HoodieWriteConfig cfg =
getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
-
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
+
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs,
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 5580d07663e..56fabb362b6 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -671,6 +671,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
.withMetadataIndexBloomFilterFileGroups(4)
.withMetadataIndexColumnStats(true)
.withMetadataIndexBloomFilterFileGroups(2)
+ .withMaxNumDeltaCommitsBeforeCompaction(12) // cannot restore to
before the oldest compaction on MDT as there are no base files before that time
.build())
.build();
init(tableType, writeConfig);
@@ -1332,16 +1333,13 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(testTable);
++numRollbacks;
} catch (HoodieMetadataException e) {
- exceptionRaised = true;
+ // This is expected since we are rolling back commits that are older
than the latest compaction on the MDT
break;
}
}
-
- assertFalse(exceptionRaised, "Metadata table should not archive instants
that are in dataset active timeline");
// Since each rollback also creates a deltacommit, we can only support
rolling back of half of the original
// instants present before rollback started.
- assertTrue(numRollbacks >= minArchiveCommitsDataset / 2,
- "Rollbacks of non archived instants should work");
+ assertTrue(numRollbacks >= minArchiveCommitsDataset / 2, "Rollbacks of non
archived instants should work");
}
/**
@@ -1591,20 +1589,30 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
extraFiles.put("p1", Collections.singletonList("f10"));
extraFiles.put("p2", Collections.singletonList("f12"));
testTable.doRollbackWithExtraFiles("0000004", "0000005", extraFiles);
- if (!ignoreSpuriousDeletes) {
- assertThrows(HoodieMetadataException.class, () ->
validateMetadata(testTable));
- } else {
- validateMetadata(testTable);
- }
+ validateMetadata(testTable);
}
/**
* Test several table operations with restore. This test uses
SparkRDDWriteClient.
* Once the restore support is ready in HoodieTestTable, then rewrite this
test.
*/
- @ParameterizedTest
- @EnumSource(HoodieTableType.class)
- public void testTableOperationsWithRestore(HoodieTableType tableType) throws
Exception {
+ @Test
+ public void testTableOperationsWithRestore() throws Exception {
+ HoodieTableType tableType = COPY_ON_WRITE;
+ init(tableType);
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+ .withRollbackUsingMarkers(false).build();
+ testTableOperationsImpl(engineContext, writeConfig);
+ }
+
+ /**
+ * Test several table operations with restore. This test uses
SparkRDDWriteClient.
+ * Once the restore support is ready in HoodieTestTable, then rewrite this
test.
+ */
+ @Test
+ public void testTableOperationsWithRestoreforMOR() throws Exception {
+ HoodieTableType tableType = MERGE_ON_READ;
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
@@ -1802,12 +1810,17 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
* @param writeConfig - Write config
*/
private void testTableOperationsImpl(HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) throws IOException {
+
+ String newCommitTime = null;
+ List<HoodieRecord> records = new ArrayList<>();
+ List<WriteStatus> writeStatuses = null;
+
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
// Write 1 (Bulk insert)
- String newCommitTime = "20210101000100000";
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+ newCommitTime = "20210101000100000";
+ records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
- List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+ writeStatuses = client.bulkInsert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
@@ -1852,6 +1865,10 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
+ }
+
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+
// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
newCommitTime = "20210101000700000";
@@ -1860,15 +1877,15 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(client);
}
- // Deletes
+ // upserts
newCommitTime = "20210101000900000";
- records = dataGen.generateDeletes(newCommitTime, 10);
- JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r ->
r.getKey());
client.startCommitWithTime(newCommitTime);
- client.delete(deleteKeys, newCommitTime);
+ records = dataGen.generateUpdates(newCommitTime, 5);
+ writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
// Clean
- newCommitTime = "20210101000900000";
+ newCommitTime = "20210101001000000";
client.clean(newCommitTime);
validateMetadata(client);
@@ -2353,10 +2370,9 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertTrue(oldStatus.getModificationTime() <
newStatus.getModificationTime());
// Test downgrade by running the downgrader
- new UpgradeDowngrade(metaClient, writeConfig, context,
SparkUpgradeDowngradeHelper.getInstance())
- .run(HoodieTableVersion.TWO, null);
-
- assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(),
HoodieTableVersion.TWO.versionCode());
+ new UpgradeDowngrade(metaClient, writeConfig, context,
SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.TWO, null);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ assertEquals(HoodieTableVersion.TWO.versionCode(),
metaClient.getTableConfig().getTableVersion().versionCode());
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table
should not exist");
}
@@ -2522,6 +2538,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertFalse(fs.exists(new Path(getMetadataTableBasePath(basePath)
+ Path.SEPARATOR +
MetadataPartitionType.RECORD_INDEX.getPartitionPath())));
+ metaClient = HoodieTableMetaClient.reload(metaClient);
// Insert/upsert third batch of records
client = getHoodieWriteClient(cfg);
commitTime = HoodieActiveTimeline.createNewInstantTime();
@@ -2544,7 +2561,6 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
+ Path.SEPARATOR + FILES.getPartitionPath())));
assertTrue(fs.exists(new Path(getMetadataTableBasePath(basePath)
+ Path.SEPARATOR +
MetadataPartitionType.RECORD_INDEX.getPartitionPath())));
-
}
/**
@@ -2828,7 +2844,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
/**
* Test duplicate operation with same instant timestamp.
- *
+ * <p>
* This can happen if the commit on the MDT succeeds but fails on the
dataset. For some table services like clean,
* compaction, replace commit, the operation will be retried with the same
timestamp (taken from inflight). Hence,
* metadata table will see an additional commit with the same timestamp as a
previously completed deltacommit.
@@ -3390,6 +3406,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
private void changeTableVersion(HoodieTableVersion version) throws
IOException {
+ metaClient = HoodieTableMetaClient.reload(metaClient);
metaClient.getTableConfig().setTableVersion(version);
Path propertyFile = new Path(metaClient.getMetaPath() + "/" +
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
try (FSDataOutputStream os = metaClient.getFs().create(propertyFile)) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index fe89fb927ae..5b8f7cd20c7 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -1613,7 +1613,7 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(basePath))
.setLoadActiveTimelineOnLoad(true).build();
- for (int i = 1; i <= 17; i++) {
+ for (int i = 1; i <= 18; i++) {
if (i != 2) {
testTable.doWriteOperation("000000" + String.format("%02d", i),
WriteOperationType.UPSERT,
i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(),
Arrays.asList("p1", "p2"), 2);
@@ -1630,77 +1630,77 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
List<HoodieInstant> metadataTableInstants =
metadataTableMetaClient.getActiveTimeline()
.getCommitsTimeline().filterCompletedInstants().getInstants();
- if (i <= 7) {
+ if (i == 1) {
+ // In the metadata table timeline, the first delta commit is
"00000000000000"
+ assertEquals(i + 1, metadataTableInstants.size());
+ assertTrue(metadataTableInstants.contains(
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, SOLO_COMMIT_TIMESTAMP + "010")));
+ assertTrue(metadataTableInstants.contains(
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "00000001")));
+ } else if (i <= 8) {
// In the metadata table timeline, the first delta commit is
"00000000000000"
// from metadata table init, delta commits "00000001" till "00000007"
are added
// later on without archival or compaction
- assertEquals(i + 1, metadataTableInstants.size());
+ // rollback in DT will also trigger rollback in MDT
+ assertEquals(i, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, SOLO_COMMIT_TIMESTAMP + "010")));
- IntStream.range(1, i + 1).forEach(j ->
+ // rolled back commits may not be present in MDT timeline (00000001)
+ IntStream.range(2, i).forEach(j ->
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
- } else if (i == 8) {
- // i == 8
- // The instant "00000000000000" was archived since it's less than
+ } else if (i == 9) {
+ // i == 9
+ // The instant "00000000000010" was archived since it's less than
// the earliest commit on the dataset active timeline,
// the dataset active timeline has instants:
// 00000002.rollback, 00000007.commit, 00000008.commit
assertEquals(9, metadataTableInstants.size());
- assertTrue(metadataTableInstants.contains(
- new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000007001")));
- IntStream.range(1, i + 1).forEach(j ->
+ // mdt timeline 00000002, 00000003,..., 00000008,
000000028001(compaction), 00000009
+ IntStream.range(2, i).forEach(j ->
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
- } else if (i <= 11) {
- // In the metadata table timeline, the first delta commit is "00000005"
+ } else if (i <= 12) {
+ // In the metadata table timeline, the first delta commit is "00000006"
// because it equals with the earliest commit on the dataset timeline,
after archival,
- // delta commits "00000006" till "00000011" are added later on without
archival or compaction
- assertEquals(i - 3, metadataTableInstants.size());
- assertTrue(metadataTableInstants.contains(
- new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000007001")));
- IntStream.range(5, i + 1).forEach(j ->
- assertTrue(metadataTableInstants.contains(
- new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION,
- "000000" + String.format("%02d", j)))));
- } else if (i <= 14) {
- // In the metadata table timeline, the first delta commit is
"00000007001"
- // from metadata table compaction, after archival, delta commits
"00000008"
- // till "00000014" are added later on without archival or compaction
- assertEquals(i - 6, metadataTableInstants.size());
+ // delta commits "00000006" till "00000010" are added later on without
archival or compaction
+ // mdt timeline 00000006, 00000007, 00000008, 00000008.compact,
00000009, 00000010 for i = 10
+ assertEquals(i - 4, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
- new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000007001")));
- IntStream.range(8, i + 1).forEach(j ->
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000008001")));
+ IntStream.range(6, i).forEach(j ->
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION,
"000000" + String.format("%02d", j)))));
- } else if (i == 15) {
- // Only delta commits "00000008" till "00000015" are in the active
timeline + 007001 for compaction.
- assertEquals(9, metadataTableInstants.size());
+ } else if (i <= 16) {
+ // In the metadata table timeline, the first delta commit is
"00000008001"
+ // from metadata table compaction, after archival, delta commits
"00000009"
+ // till "00000016" are added later on without archival or compaction
+ // mdt timeline 00000008001, 00000009, 00000010, 00000011, 00000012,
00000013
+ assertEquals(i - 7, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
- new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000007001")));
- IntStream.range(8, 16).forEach(j ->
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000008001")));
+ IntStream.range(9, i).forEach(j ->
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION,
"000000" + String.format("%02d", j)))));
- } else if (i == 16) {
- // i == 16
- // dataset timeline has commits "00000015" and "00000016",
- // the metadata timeline has commits [00000008, 00000016] and
"00000015001"
+ } else if (i == 17) {
+ // i == 17
+ // commits in MDT [0000009, .... 00000016, 00000016001.compaction,
00000017]
assertEquals(10, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
- new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000015001")));
- IntStream.range(8, 17).forEach(j ->
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000016001")));
+ IntStream.range(9, i).forEach(j ->
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION,
"000000" + String.format("%02d", j)))));
} else {
- // i == 17
- // Only commits [00000013, 00000017] and "00000015001" are on the
metadata timeline
+ // i == 18
+ // commits in MDT [0000014, .... 00000016, 00000016001.compaction,
00000017, 00000018]
assertEquals(6, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
- new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000015001")));
- IntStream.range(13, 18).forEach(j ->
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000016001")));
+ IntStream.range(14, i).forEach(j ->
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION,
"000000" + String.format("%02d", j)))));
@@ -1829,7 +1829,8 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
.setPartitionMetadata(Collections.emptyMap())
.setInstantsRollback(Collections.emptyList())
.build();
- HoodieTestTable.of(metaClient).addRollback(rollbackTime,
hoodieRollbackMetadata, isEmpty);
+ HoodieTestTable.of(metaClient).addRollback(rollbackTime,
hoodieRollbackMetadata, isEmpty, null);
+ HoodieTestTable.of(metaClient).addRollbackCompleted(rollbackTime,
hoodieRollbackMetadata, isEmpty);
}
return new HoodieInstant(inflight, "rollback", rollbackTime);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 46ce709751b..fbc88bb2718 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -482,24 +482,21 @@ public class TestHoodieSparkMergeOnReadTableRollback
extends SparkClientFunction
copyOfRecords.clear();
- // Rollback latest commit first
- client.restoreToInstant("000", cfg.isMetadataTableEnabled());
+ // Restore to 3rd commit.
+ client.restoreToInstant("003", cfg.isMetadataTableEnabled());
- metaClient = HoodieTableMetaClient.reload(metaClient);
+ metaClient.reloadActiveTimeline();
allFiles = listAllBaseFilesInPath(hoodieTable);
- tableView = getHoodieTableFileSystemView(metaClient,
metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ tableView = getHoodieTableFileSystemView(metaClient,
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
- assertFalse(dataFilesToRead.findAny().isPresent());
+ assertFalse(dataFilesToRead.filter(hoodieBaseFile ->
hoodieBaseFile.getCommitTime().compareTo("003") > 0).findAny().isPresent());
TableFileSystemView.SliceView rtView =
getHoodieTableFileSystemView(metaClient,
metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
List<HoodieFileGroup> fileGroups =
((HoodieTableFileSystemView)
rtView).getAllFileGroups().collect(Collectors.toList());
- assertTrue(fileGroups.isEmpty());
-
- // make sure there are no log files remaining
- assertEquals(0L, ((HoodieTableFileSystemView) rtView).getAllFileGroups()
- .filter(fileGroup -> fileGroup.getAllRawFileSlices().noneMatch(f ->
f.getLogFiles().count() == 0))
- .count());
+ assertFalse(fileGroups.isEmpty());
+ assertFalse(fileGroups.stream().filter(fileGroup ->
fileGroup.getAllFileSlices().map(fileSlice
+ -> fileSlice.getBaseInstantTime().compareTo("003") > 0).count() >
0).findAny().isPresent());
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 60b87020c11..7d8416c368a 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -337,6 +337,7 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
// downgrade to version 3 and check TABLE_CHECKSUM is still present
new UpgradeDowngrade(metaClient, cfg, context,
SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.THREE, null);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
assertTableVersionOnDataAndMetadataTable(metaClient,
HoodieTableVersion.THREE);
assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key()));
assertEquals(checksum,
metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key()));
@@ -389,7 +390,7 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
Map<String, String> params = new HashMap<>();
addNewTableParamsToProps(params, tableName);
Properties properties = new Properties();
- params.forEach((k,v) -> properties.setProperty(k, v));
+ params.forEach((k, v) -> properties.setProperty(k, v));
initMetaClient(getTableType(), properties);
// init config, table and client.
@@ -453,7 +454,7 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
private void doInsertWithDefaultPartition(SparkRDDWriteClient client) {
// Write 1 (only inserts)
- dataGen = new HoodieTestDataGenerator(new
String[]{DEPRECATED_DEFAULT_PARTITION_PATH});
+ dataGen = new HoodieTestDataGenerator(new String[]
{DEPRECATED_DEFAULT_PARTITION_PATH});
String commit1 = "005";
client.startCommitWithTime(commit1);
List<HoodieRecord> records = dataGen.generateInserts(commit1, 100);
@@ -463,7 +464,7 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
private void doInsertWithDefaultHiveStylePartition(SparkRDDWriteClient
client) {
// Write 1 (only inserts)
- dataGen = new HoodieTestDataGenerator(new String[]{"partition_path=" +
DEPRECATED_DEFAULT_PARTITION_PATH});
+ dataGen = new HoodieTestDataGenerator(new String[] {"partition_path=" +
DEPRECATED_DEFAULT_PARTITION_PATH});
String commit1 = "005";
client.startCommitWithTime(commit1);
List<HoodieRecord> records = dataGen.generateInserts(commit1, 100);
@@ -536,7 +537,6 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
.withMetadataIndexBloomFilter(true)
.withEnableRecordIndex(true).build())
.build();
- HoodieTable table = getHoodieTable(metaClient, config);
for (MetadataPartitionType partitionType : MetadataPartitionType.values())
{
metaClient.getTableConfig().setMetadataPartitionState(metaClient,
partitionType, true);
}
@@ -571,7 +571,7 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
prepForDowngradeFromVersion(HoodieTableVersion.SIX);
new UpgradeDowngrade(metaClient, config, context,
SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.FIVE, null);
-
+ metaClient = HoodieTableMetaClient.reload(metaClient);
// validate the relevant table states after downgrade
assertFalse(Files.exists(recordIndexPartitionPath), "record index
partition should be deleted.");
assertEquals(allPartitionsExceptRecordIndex,
metaClient.getTableConfig().getMetadataPartitions(),
@@ -638,7 +638,7 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
// assert marker files
assertMarkerFilesForDowngrade(table, commitInstant, toVersion ==
HoodieTableVersion.ONE);
}
-
+
// verify hoodie.table.version got downgraded
metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath())
.setLayoutVersion(Option.of(new
TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build();
@@ -790,17 +790,17 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
/**
* Create two commits and may or may not commit 2nd commit.
*
- * @param firstPartitionCommit2FileSlices list to hold file slices in first
partition.
+ * @param firstPartitionCommit2FileSlices list to hold file slices in first
partition.
* @param secondPartitionCommit2FileSlices list of hold file slices from
second partition.
- * @param cfg instance of {@link HoodieWriteConfig}
- * @param client instance of {@link SparkRDDWriteClient} to use.
- * @param commitSecondUpsert true if 2nd commit needs to be committed. false
otherwise.
+ * @param cfg instance of {@link
HoodieWriteConfig}
+ * @param client instance of {@link
SparkRDDWriteClient} to use.
+ * @param commitSecondUpsert true if 2nd commit needs to be
committed. false otherwise.
* @return a pair of list of records from 1st and 2nd batch.
*/
private Pair<List<HoodieRecord>, List<HoodieRecord>>
twoUpsertCommitDataWithTwoPartitions(List<FileSlice>
firstPartitionCommit2FileSlices,
- List<FileSlice> secondPartitionCommit2FileSlices,
- HoodieWriteConfig cfg, SparkRDDWriteClient client,
- boolean commitSecondUpsert) throws IOException {
+
List<FileSlice> secondPartitionCommit2FileSlices,
+
HoodieWriteConfig cfg, SparkRDDWriteClient client,
+
boolean commitSecondUpsert) throws IOException {
//just generate two partitions
dataGen = new HoodieTestDataGenerator(new String[]
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
//1. prepare data
@@ -857,11 +857,11 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
* @param table instance of {@link HoodieTable}
*/
private void prepForUpgradeFromZeroToOne(HoodieTable table) throws
IOException {
- List<HoodieInstant> instantsToBeParsed =
+ List<HoodieInstant> instantsToBeParsed =
metaClient.getActiveTimeline()
- .getCommitsTimeline()
- .getInstantsAsStream()
- .collect(Collectors.toList());
+ .getCommitsTimeline()
+ .getInstantsAsStream()
+ .collect(Collectors.toList());
for (HoodieInstant instant : instantsToBeParsed) {
WriteMarkers writeMarkers =
WriteMarkersFactory.get(table.getConfig().getMarkersType(), table,
instant.getTimestamp());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 062f50c21e9..6a1f1845816 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -49,7 +49,7 @@ public final class HoodieMetadataConfig extends HoodieConfig {
// Meta fields are not populated by default for metadata table
public static final boolean DEFAULT_METADATA_POPULATE_META_FIELDS = false;
// Default number of commits to retain, without cleaning, on metadata table
- public static final int DEFAULT_METADATA_CLEANER_COMMITS_RETAINED = 3;
+ public static final int DEFAULT_METADATA_CLEANER_COMMITS_RETAINED = 10;
public static final String METADATA_PREFIX = "hoodie.metadata";
public static final String OPTIMIZED_LOG_BLOCKS_SCAN =
".optimized.log.blocks.scan.enable";
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 6182bc4d4eb..a5d56c91d5e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -168,6 +168,12 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
s -> s.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) &&
!s.isCompleted()), details);
}
+ @Override
+ public HoodieTimeline filterRequestedRollbackTimeline() {
+ return new HoodieDefaultTimeline(getInstantsAsStream().filter(
+ s -> s.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) &&
s.isRequested()), details);
+ }
+
@Override
public HoodieTimeline filterPendingCompactionTimeline() {
return new HoodieDefaultTimeline(
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 72a6e910b76..51fe34badd2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -226,6 +226,11 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline filterPendingRollbackTimeline();
+ /**
+ * Filter this timeline for requested rollbacks.
+ */
+ HoodieTimeline filterRequestedRollbackTimeline();
+
/**
* Create a new Timeline with all the instants after startTs.
*/
@@ -454,6 +459,10 @@ public interface HoodieTimeline extends Serializable {
return instant.isRequested() ? instant :
HoodieTimeline.getRequestedInstant(instant);
}
+ static HoodieInstant getRestoreRequestedInstant(HoodieInstant instant) {
+ return instant.isRequested() ? instant :
HoodieTimeline.getRequestedInstant(instant);
+ }
+
static HoodieInstant getIndexRequestedInstant(final String timestamp) {
return new HoodieInstant(State.REQUESTED, INDEXING_ACTION, timestamp);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 56f478e781c..384c96a664e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -78,6 +78,7 @@ import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -118,24 +119,35 @@ public class HoodieTableMetadataUtil {
public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
public static final String PARTITION_NAME_RECORD_INDEX = "record_index";
- // Suffix to use for compaction
- private static final String COMPACTION_TIMESTAMP_SUFFIX = "001";
+ // Suffix to use for various operations on MDT
+ private enum OperationSuffix {
+ COMPACTION("001"),
+ CLEAN("002"),
+ RESTORE("003"),
+ METADATA_INDEXER("004"),
+ LOG_COMPACTION("005"),
+ ROLLBACK("006");
+ static final Set<String> ALL_SUFFIXES =
Arrays.stream(OperationSuffix.values()).map(o ->
o.getSuffix()).collect(Collectors.toSet());
- // Suffix to use for clean
- private static final String CLEAN_TIMESTAMP_SUFFIX = "002";
+ private final String suffix;
- // This suffix used by the delta commits from async indexer
(`HoodieIndexer`),
- // when the `indexUptoInstantTime` already exists in the metadata table,
- // to avoid collision.
- public static final String METADATA_INDEXER_TIME_SUFFIX = "004";
+ OperationSuffix(String suffix) {
+ this.suffix = suffix;
+ }
- // Suffix to use for log compaction
- private static final String LOG_COMPACTION_TIMESTAMP_SUFFIX = "005";
+ String getSuffix() {
+ return suffix;
+ }
+
+ static boolean isValidSuffix(String suffix) {
+ return ALL_SUFFIXES.contains(suffix);
+ }
+ }
// This suffix and all after that are used for initialization of the various
partitions. The unused suffixes lower than this value
// are reserved for future operations on the MDT.
- public static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; //
corresponds to "010";
+ private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; //
corresponds to "010";
/**
* Returns whether the files partition of metadata table is ready for read.
@@ -497,7 +509,6 @@ public class HoodieTableMetadataUtil {
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
Option.of(new ArrayList<>(deletedFiles)));
-
records.add(record);
fileDeleteCount[0] += deletedFiles.size();
boolean isPartitionDeleted = partitionMetadata.getIsPartitionDeleted();
@@ -515,6 +526,46 @@ public class HoodieTableMetadataUtil {
return records;
}
+ public static Map<MetadataPartitionType, HoodieData<HoodieRecord>>
convertMissingPartitionRecords(HoodieEngineContext engineContext,
+
List<String> deletedPartitions, Map<String, Map<String, Long>> filesAdded,
+
Map<String, List<String>> filesDeleted, String instantTime) {
+ List<HoodieRecord> records = new LinkedList<>();
+ int[] fileDeleteCount = {0};
+ int[] filesAddedCount = {0};
+
+ filesAdded.forEach((k,v) -> {
+ String partition = k;
+ Map<String, Long> filestoAdd = v;
+ filesAddedCount[0] += filestoAdd.size();
+ Option<List<String>> filesToDelete = filesDeleted.containsKey(k) ?
Option.of(filesDeleted.get(k)) : Option.empty();
+ if (filesToDelete.isPresent()) {
+ fileDeleteCount[0] += filesToDelete.get().size();
+ }
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition,
Option.of(filestoAdd), filesToDelete);
+ records.add(record);
+ });
+
+ // there could be partitions which only has missing deleted files.
+ filesDeleted.forEach((k,v) -> {
+ String partition = k;
+ Option<List<String>> filesToDelete = Option.of(v);
+ if (!filesAdded.containsKey(partition)) {
+ fileDeleteCount[0] += filesToDelete.get().size();
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
filesToDelete);
+ records.add(record);
+ }
+ });
+
+ if (!deletedPartitions.isEmpty()) {
+ // if there are partitions to be deleted, add them to delete list
+
records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions,
true));
+ }
+
+ LOG.info("Re-adding missing records at " + instantTime + " during Restore.
#partitions_updated=" + records.size()
+ + ", #files_added=" + filesAddedCount[0] + ", #files_deleted=" +
fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size());
+ return Collections.singletonMap(MetadataPartitionType.FILES,
engineContext.parallelize(records, 1));
+ }
+
/**
* Convert clean metadata to bloom filter index records.
*
@@ -589,99 +640,61 @@ public class HoodieTableMetadataUtil {
});
}
- /**
- * Convert restore action metadata to metadata table records.
- */
- public static Map<MetadataPartitionType, HoodieData<HoodieRecord>>
convertMetadataToRecords(
- HoodieEngineContext engineContext, HoodieActiveTimeline
metadataTableTimeline, HoodieRestoreMetadata restoreMetadata,
- MetadataRecordsGenerationParams recordsGenerationParams, String
instantTime, Option<String> lastSyncTs) {
- final Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordsMap = new HashMap<>();
- final Map<String, Map<String, Long>> partitionToAppendedFiles = new
HashMap<>();
- final Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
-
- processRestoreMetadata(metadataTableTimeline, restoreMetadata,
partitionToAppendedFiles, partitionToDeletedFiles, lastSyncTs);
- final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
-
engineContext.parallelize(convertFilesToFilesPartitionRecords(partitionToDeletedFiles,
partitionToAppendedFiles, instantTime, "Restore"), 1);
- partitionToRecordsMap.put(MetadataPartitionType.FILES,
filesPartitionRecordsRDD);
-
- if
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS))
{
- final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
- convertFilesToBloomFilterRecords(engineContext,
partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams,
instantTime);
- partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS,
metadataBloomFilterRecordsRDD);
- }
-
- if
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS))
{
- final HoodieData<HoodieRecord> metadataColumnStatsRDD =
- convertFilesToColumnStatsRecords(engineContext,
partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams);
- partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS,
metadataColumnStatsRDD);
- }
- return partitionToRecordsMap;
- }
-
- /**
- * Aggregates all files deleted and appended to from all rollbacks
associated with a restore operation then
- * creates metadata table records for them.
- *
- * @param restoreMetadata - Restore action metadata
- * @return a list of metadata table records
- */
- private static void processRestoreMetadata(HoodieActiveTimeline
metadataTableTimeline,
- HoodieRestoreMetadata
restoreMetadata,
- Map<String, Map<String, Long>>
partitionToAppendedFiles,
- Map<String, List<String>>
partitionToDeletedFiles,
- Option<String> lastSyncTs) {
- restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms ->
rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm,
- partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs)));
- }
-
/**
* Convert rollback action metadata to metadata table records.
+ * <p>
+ * We only need to handle FILES partition here as HUDI rollbacks on MOR
table may end up adding a new log file. All other partitions
+ * are handled by actual rollback of the deltacommit which added records to
those partitions.
*/
public static Map<MetadataPartitionType, HoodieData<HoodieRecord>>
convertMetadataToRecords(
- HoodieEngineContext engineContext, HoodieActiveTimeline
metadataTableTimeline,
- HoodieRollbackMetadata rollbackMetadata, MetadataRecordsGenerationParams
recordsGenerationParams,
- String instantTime, Option<String> lastSyncTs, boolean wasSynced) {
- final Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordsMap = new HashMap<>();
- Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
- Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+ HoodieEngineContext engineContext, HoodieTableMetaClient
dataTableMetaClient, HoodieRollbackMetadata rollbackMetadata, String
instantTime) {
- List<HoodieRecord> filesPartitionRecords =
- convertMetadataToRollbackRecords(metadataTableTimeline,
rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles,
instantTime, lastSyncTs, wasSynced);
- final HoodieData<HoodieRecord> rollbackRecordsRDD =
engineContext.parallelize(filesPartitionRecords, 1);
- partitionToRecordsMap.put(MetadataPartitionType.FILES, rollbackRecordsRDD);
+ List<HoodieRecord> filesPartitionRecords =
convertMetadataToRollbackRecords(rollbackMetadata, instantTime,
dataTableMetaClient);
+ final HoodieData<HoodieRecord> rollbackRecordsRDD =
filesPartitionRecords.isEmpty() ? engineContext.emptyHoodieData()
+ : engineContext.parallelize(filesPartitionRecords,
filesPartitionRecords.size());
- if
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS))
{
- final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
- convertFilesToBloomFilterRecords(engineContext,
partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams,
instantTime);
- partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS,
metadataBloomFilterRecordsRDD);
- }
+ return Collections.singletonMap(MetadataPartitionType.FILES,
rollbackRecordsRDD);
+ }
- if
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS))
{
- final HoodieData<HoodieRecord> metadataColumnStatsRDD =
- convertFilesToColumnStatsRecords(engineContext,
partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams);
- partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS,
metadataColumnStatsRDD);
+ private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient
dataTableMetaClient, String instantTime,
+ Map<String, Map<String,
Long>> partitionToFilesMap) {
+ HoodieInstant rollbackInstant = new
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION,
instantTime);
+ HoodieInstant requested =
HoodieTimeline.getRollbackRequestedInstant(rollbackInstant);
+ try {
+ HoodieRollbackPlan rollbackPlan =
TimelineMetadataUtils.deserializeAvroMetadata(
+
dataTableMetaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(),
HoodieRollbackPlan.class);
+
+ rollbackPlan.getRollbackRequests().forEach(rollbackRequest -> {
+ final String partitionId =
getPartitionIdentifier(rollbackRequest.getPartitionPath());
+ partitionToFilesMap.computeIfAbsent(partitionId, s -> new HashMap<>());
+ // fetch only log files that are expected to be RB'd in DT as part of
this rollback. these log files will not be deleted, but rendered
+ // invalid once rollback is complete.
+ if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
+ Map<String, Long> logFiles = new HashMap<>();
+ rollbackRequest.getLogBlocksToBeDeleted().forEach((k,v) -> {
+ String fileName = k.substring(k.lastIndexOf("/") + 1);
+ // rollback plan may not have size for log files to be rolled
back. but while merging w/ original commits, the size will get adjusted.
+ logFiles.put(fileName, 1L);
+ });
+ partitionToFilesMap.get(partitionId).putAll(logFiles);
+ }
+ });
+ } catch (IOException e) {
+ throw new HoodieMetadataException("Parsing rollback plan for " +
rollbackInstant.toString() + " failed ");
}
-
- return partitionToRecordsMap;
}
/**
* Convert rollback action metadata to files partition records.
+ * Consider only new log files added.
*/
- private static List<HoodieRecord>
convertMetadataToRollbackRecords(HoodieActiveTimeline metadataTableTimeline,
-
HoodieRollbackMetadata rollbackMetadata,
-
Map<String, List<String>> partitionToDeletedFiles,
-
Map<String, Map<String, Long>> partitionToAppendedFiles,
+ private static List<HoodieRecord>
convertMetadataToRollbackRecords(HoodieRollbackMetadata rollbackMetadata,
String
instantTime,
-
Option<String> lastSyncTs, boolean wasSynced) {
- processRollbackMetadata(metadataTableTimeline, rollbackMetadata,
partitionToDeletedFiles,
- partitionToAppendedFiles, lastSyncTs);
- if (!wasSynced) {
- // Since the instant-being-rolled-back was never committed to the
metadata table, the files added there
- // need not be deleted. For MOR Table, the rollback appends logBlocks so
we need to keep the appended files.
- partitionToDeletedFiles.clear();
- }
- return convertFilesToFilesPartitionRecords(partitionToDeletedFiles,
partitionToAppendedFiles, instantTime, "Rollback");
+
HoodieTableMetaClient dataTableMetaClient) {
+ Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+ processRollbackMetadata(rollbackMetadata, partitionToAppendedFiles);
+ reAddLogFilesFromRollbackPlan(dataTableMetaClient, instantTime,
partitionToAppendedFiles);
+ return convertFilesToFilesPartitionRecords(Collections.EMPTY_MAP,
partitionToAppendedFiles, instantTime, "Rollback");
}
/**
@@ -690,86 +703,16 @@ public class HoodieTableMetadataUtil {
* During a rollback files may be deleted (COW, MOR) or rollback blocks be
appended (MOR only) to files. This
* function will extract this change file for each partition.
*
- * @param metadataTableTimeline Current timeline of the Metadata Table
* @param rollbackMetadata {@code HoodieRollbackMetadata}
- * @param partitionToDeletedFiles The {@code Map} to fill with files
deleted per partition.
* @param partitionToAppendedFiles The {@code Map} to fill with files
appended per partition and their sizes.
*/
- private static void processRollbackMetadata(HoodieActiveTimeline
metadataTableTimeline,
- HoodieRollbackMetadata
rollbackMetadata,
- Map<String, List<String>>
partitionToDeletedFiles,
- Map<String, Map<String, Long>>
partitionToAppendedFiles,
- Option<String> lastSyncTs) {
+ private static void processRollbackMetadata(HoodieRollbackMetadata
rollbackMetadata,
+ Map<String, Map<String, Long>>
partitionToAppendedFiles) {
rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
- final String instantToRollback =
rollbackMetadata.getCommitsRollback().get(0);
// Has this rollback produced new files?
boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null &&
!pm.getRollbackLogFiles().isEmpty();
- boolean hasNonZeroRollbackLogFiles = hasRollbackLogFiles &&
pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
-
- // If instant-to-rollback has not been synced to metadata table yet then
there is no need to update metadata
- // This can happen in two cases:
- // Case 1: Metadata Table timeline is behind the instant-to-rollback.
- boolean shouldSkip = lastSyncTs.isPresent()
- && HoodieTimeline.compareTimestamps(instantToRollback,
HoodieTimeline.GREATER_THAN, lastSyncTs.get());
-
- if (!hasNonZeroRollbackLogFiles && shouldSkip) {
- LOG.info(String.format("Skipping syncing of rollbackMetadata at %s,
given metadata table is already synced upto to %s",
- instantToRollback, lastSyncTs.get()));
- return;
- }
-
- // Case 2: The instant-to-rollback was never committed to Metadata
Table. This can happen if the instant-to-rollback
- // was a failed commit (never completed).
- //
- // There are two cases for failed commit that we need to take care of:
- // 1) The commit was synced to metadata table successfully but the
dataset meta file switches state failed
- // (from INFLIGHT to COMPLETED), the committed files should be rolled
back thus the rollback metadata
- // can not be skipped, usually a failover should be triggered and the
metadata active timeline expects
- // to contain the commit, we could check whether the commit was synced
to metadata table
- // through HoodieActiveTimeline#containsInstant.
- //
- // 2) The commit synced to metadata table failed or was never synced
to metadata table,
- // in this case, the rollback metadata should be skipped.
- //
- // And in which case,
- //
metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())
- // returns true ?
- // It is most probably because of compaction rollback, we schedule a
compaction plan early in the timeline (say t1)
- // then after a long time schedule and execute the plan then try to
rollback it.
- //
- // scheduled execution rollback compaction actions
- // ----- t1 ----- t3 ----- t4 ----- dataset timeline
- //
- // ---------- t2 (archive) ----------- metadata timeline
- //
- // when at time t4, we commit the compaction rollback,the above check
returns true.
- HoodieInstant syncedInstant = new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, instantToRollback);
- if
(metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp()))
{
- throw new HoodieMetadataException(String.format("The instant %s
required to sync rollback of %s has been archived",
- syncedInstant, instantToRollback));
- }
- shouldSkip = !metadataTableTimeline.containsInstant(syncedInstant);
- if (!hasNonZeroRollbackLogFiles && shouldSkip) {
- LOG.info(String.format("Skipping syncing of rollbackMetadata at %s,
since this instant was never committed to Metadata Table",
- instantToRollback));
- return;
- }
-
final String partition = pm.getPartitionPath();
- if ((!pm.getSuccessDeleteFiles().isEmpty() ||
!pm.getFailedDeleteFiles().isEmpty()) && !shouldSkip) {
- if (!partitionToDeletedFiles.containsKey(partition)) {
- partitionToDeletedFiles.put(partition, new ArrayList<>());
- }
-
- // Extract deleted file name from the absolute paths saved in
getSuccessDeleteFiles()
- List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p
-> new Path(p).getName())
- .collect(Collectors.toList());
- if (!pm.getFailedDeleteFiles().isEmpty()) {
- deletedFiles.addAll(pm.getFailedDeleteFiles().stream().map(p -> new
Path(p).getName())
- .collect(Collectors.toList()));
- }
- partitionToDeletedFiles.get(partition).addAll(deletedFiles);
- }
+ final String partitionId = getPartitionIdentifier(partition);
BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> {
// if a file exists in both written log files and rollback log files,
we want to pick the one that is higher
@@ -778,13 +721,14 @@ public class HoodieTableMetadataUtil {
};
if (hasRollbackLogFiles) {
- if (!partitionToAppendedFiles.containsKey(partition)) {
- partitionToAppendedFiles.put(partition, new HashMap<>());
+ if (!partitionToAppendedFiles.containsKey(partitionId)) {
+ partitionToAppendedFiles.put(partitionId, new HashMap<>());
}
// Extract appended file name from the absolute paths saved in
getAppendFiles()
pm.getRollbackLogFiles().forEach((path, size) -> {
- partitionToAppendedFiles.get(partition).merge(new
Path(path).getName(), size, fileMergeFn);
+ String fileName = new Path(path).getName();
+ partitionToAppendedFiles.get(partitionId).merge(fileName, size,
fileMergeFn);
});
}
});
@@ -793,9 +737,9 @@ public class HoodieTableMetadataUtil {
/**
* Convert rollback action metadata to files partition records.
*/
- private static List<HoodieRecord>
convertFilesToFilesPartitionRecords(Map<String, List<String>>
partitionToDeletedFiles,
-
Map<String, Map<String, Long>> partitionToAppendedFiles,
- String
instantTime, String operation) {
+ protected static List<HoodieRecord>
convertFilesToFilesPartitionRecords(Map<String, List<String>>
partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles,
+
String instantTime, String operation) {
List<HoodieRecord> records = new LinkedList<>();
int[] fileChangeCount = {0, 0}; // deletes, appends
@@ -1344,8 +1288,7 @@ public class HoodieTableMetadataUtil {
// have corresponding completed instant in the data table
validInstantTimestamps.addAll(
metadataMetaClient.getActiveTimeline()
- .filter(instant -> instant.isCompleted()
- && (isIndexingCommit(instant.getTimestamp()) ||
isLogCompactionInstant(instant)))
+ .filter(instant -> instant.isCompleted() &&
isValidInstant(instant))
.getInstantsAsStream()
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()));
@@ -1360,11 +1303,48 @@ public class HoodieTableMetadataUtil {
validInstantTimestamps.addAll(getRollbackedCommits(instant,
datasetTimeline));
});
+ // add restore instants from MDT.
+
metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants()
+ .filter(instant ->
instant.getAction().equals(HoodieTimeline.RESTORE_ACTION))
+ .getInstants().forEach(instant ->
validInstantTimestamps.add(instant.getTimestamp()));
+
// SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid
timestamp
validInstantTimestamps.add(createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP,
PARTITION_INITIALIZATION_TIME_SUFFIX));
return validInstantTimestamps;
}
+ /**
+ * Checks if the Instant is a delta commit and has a valid suffix for
operations on MDT.
+ *
+ * @param instant {@code HoodieInstant} to check.
+ * @return {@code true} if the instant is valid.
+ */
+ public static boolean isValidInstant(HoodieInstant instant) {
+ // Should be a deltacommit
+ if (!instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
+ return false;
+ }
+
+ // Check correct length. The timestamp should have a suffix over the
timeline's timestamp format.
+ final String instantTime = instant.getTimestamp();
+ if (!(instantTime.length() == MILLIS_INSTANT_ID_LENGTH +
OperationSuffix.METADATA_INDEXER.getSuffix().length())) {
+ return false;
+ }
+
+ // Is this a fixed operations suffix
+ final String suffix = instantTime.substring(instantTime.length() - 3);
+ if (OperationSuffix.isValidSuffix(suffix)) {
+ return true;
+ }
+
+ // Is this a index init suffix?
+ if (suffix.compareTo(String.format("%03d",
PARTITION_INITIALIZATION_TIME_SUFFIX)) >= 0) {
+ return true;
+ }
+
+ return false;
+ }
+
/**
* Checks if a delta commit in metadata table is written by async indexer.
* <p>
@@ -1375,20 +1355,8 @@ public class HoodieTableMetadataUtil {
* @return {@code true} if from async indexer; {@code false} otherwise.
*/
public static boolean isIndexingCommit(String instantTime) {
- return instantTime.length() == MILLIS_INSTANT_ID_LENGTH +
METADATA_INDEXER_TIME_SUFFIX.length()
- && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
- }
-
- /**
- * This method returns true if the instant provided belongs to Log
compaction instant.
- * For metadata table, log compaction instant are created with Suffix "004"
provided in LOG_COMPACTION_TIMESTAMP_SUFFIX.
- * @param instant Hoodie completed instant.
- * @return true for logcompaction instants flase otherwise.
- */
- public static boolean isLogCompactionInstant(HoodieInstant instant) {
- return instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)
- && instant.getTimestamp().length() == MILLIS_INSTANT_ID_LENGTH +
LOG_COMPACTION_TIMESTAMP_SUFFIX.length()
- && instant.getTimestamp().endsWith(LOG_COMPACTION_TIMESTAMP_SUFFIX);
+ return instantTime.length() == MILLIS_INSTANT_ID_LENGTH +
OperationSuffix.METADATA_INDEXER.getSuffix().length()
+ &&
instantTime.endsWith(OperationSuffix.METADATA_INDEXER.getSuffix());
}
/**
@@ -1590,14 +1558,26 @@ public class HoodieTableMetadataUtil {
* Create the timestamp for a clean operation on the metadata table.
*/
public static String createCleanTimestamp(String timestamp) {
- return timestamp + CLEAN_TIMESTAMP_SUFFIX;
+ return timestamp + OperationSuffix.CLEAN.getSuffix();
+ }
+
+ public static String createRollbackTimestamp(String timestamp) {
+ return timestamp + OperationSuffix.ROLLBACK.getSuffix();
+ }
+
+ public static String createRestoreTimestamp(String timestamp) {
+ return timestamp + OperationSuffix.RESTORE.getSuffix();
+ }
+
+ public static String createAsyncIndexerTimestamp(String timestamp) {
+ return timestamp + OperationSuffix.METADATA_INDEXER.getSuffix();
}
/**
* Create the timestamp for a compaction operation on the metadata table.
*/
public static String createCompactionTimestamp(String timestamp) {
- return timestamp + COMPACTION_TIMESTAMP_SUFFIX;
+ return timestamp + OperationSuffix.COMPACTION.getSuffix();
}
/**
@@ -1614,7 +1594,7 @@ public class HoodieTableMetadataUtil {
* Create the timestamp for a compaction operation on the metadata table.
*/
public static String createLogCompactionTimestamp(String timestamp) {
- return timestamp + LOG_COMPACTION_TIMESTAMP_SUFFIX;
+ return timestamp + OperationSuffix.LOG_COMPACTION.getSuffix();
}
/**
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 37bd983f17b..49095683a2b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -32,6 +32,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
import org.apache.hudi.avro.model.HoodieSliceInfo;
@@ -368,13 +369,23 @@ public class HoodieTestTable {
return this;
}
- public HoodieTestTable addRollback(String instantTime,
HoodieRollbackMetadata rollbackMetadata) throws IOException {
- return addRollback(instantTime, rollbackMetadata, false);
+ public HoodieTestTable addRollback(String instantTime,
HoodieRollbackMetadata rollbackMetadata,HoodieRollbackPlan rollbackPlan) throws
IOException {
+ return addRollback(instantTime, rollbackMetadata, false, rollbackPlan);
}
- public HoodieTestTable addRollback(String instantTime,
HoodieRollbackMetadata rollbackMetadata, boolean isEmpty) throws IOException {
- createRequestedRollbackFile(basePath, instantTime);
+ public HoodieTestTable addRollback(String instantTime,
HoodieRollbackMetadata rollbackMetadata, boolean isEmpty, HoodieRollbackPlan
rollbackPlan) throws IOException {
+ if (rollbackPlan != null) {
+ createRequestedRollbackFile(basePath, instantTime, rollbackPlan);
+ } else {
+ createRequestedRollbackFile(basePath, instantTime);
+ }
createInflightRollbackFile(basePath, instantTime);
+ // createRollbackFile(basePath, instantTime, rollbackMetadata, isEmpty);
+ currentInstantTime = instantTime;
+ return this;
+ }
+
+ public HoodieTestTable addRollbackCompleted(String instantTime,
HoodieRollbackMetadata rollbackMetadata, boolean isEmpty) throws IOException {
createRollbackFile(basePath, instantTime, rollbackMetadata, isEmpty);
currentInstantTime = instantTime;
return this;
@@ -386,7 +397,7 @@ public class HoodieTestTable {
return this;
}
- public HoodieRollbackMetadata getRollbackMetadata(String
instantTimeToDelete, Map<String, List<String>> partitionToFilesMeta) throws
Exception {
+ public HoodieRollbackMetadata getRollbackMetadata(String
instantTimeToDelete, Map<String, List<String>> partitionToFilesMeta, boolean
shouldAddRollbackLogFile) throws Exception {
HoodieRollbackMetadata rollbackMetadata = new HoodieRollbackMetadata();
rollbackMetadata.setCommitsRollback(Collections.singletonList(instantTimeToDelete));
rollbackMetadata.setStartRollbackTime(instantTimeToDelete);
@@ -396,11 +407,13 @@ public class HoodieTestTable {
rollbackPartitionMetadata.setPartitionPath(entry.getKey());
rollbackPartitionMetadata.setSuccessDeleteFiles(entry.getValue());
rollbackPartitionMetadata.setFailedDeleteFiles(new ArrayList<>());
- long rollbackLogFileSize = 50 + RANDOM.nextInt(500);
- String fileId = UUID.randomUUID().toString();
- String logFileName = logFileName(instantTimeToDelete, fileId, 0);
- FileCreateUtils.createLogFile(basePath, entry.getKey(),
instantTimeToDelete, fileId, 0, (int) rollbackLogFileSize);
- rollbackPartitionMetadata.setRollbackLogFiles(singletonMap(logFileName,
rollbackLogFileSize));
+ if (shouldAddRollbackLogFile) {
+ long rollbackLogFileSize = 50 + RANDOM.nextInt(500);
+ String fileId = UUID.randomUUID().toString();
+ String logFileName = logFileName(instantTimeToDelete, fileId, 0);
+ FileCreateUtils.createLogFile(basePath, entry.getKey(),
instantTimeToDelete, fileId, 0, (int) rollbackLogFileSize);
+
rollbackPartitionMetadata.setRollbackLogFiles(singletonMap(logFileName,
rollbackLogFileSize));
+ }
partitionMetadataMap.put(entry.getKey(), rollbackPartitionMetadata);
}
rollbackMetadata.setPartitionMetadata(partitionMetadataMap);
@@ -819,11 +832,24 @@ public class HoodieTestTable {
throw new IllegalArgumentException("Instant to rollback not present in
timeline: " + commitTimeToRollback);
}
Map<String, List<String>> partitionFiles =
getPartitionFiles(commitMetadata.get());
- HoodieRollbackMetadata rollbackMetadata =
getRollbackMetadata(commitTimeToRollback, partitionFiles);
+ HoodieRollbackMetadata rollbackMetadata =
getRollbackMetadata(commitTimeToRollback, partitionFiles, false);
for (Map.Entry<String, List<String>> entry : partitionFiles.entrySet()) {
deleteFilesInPartition(entry.getKey(), entry.getValue());
}
- return addRollback(commitTime, rollbackMetadata);
+ HoodieRollbackPlan rollbackPlan = getHoodieRollbackPlan(commitTime,
partitionFiles);
+ HoodieTestTable testTable = addRollback(commitTime, rollbackMetadata,
rollbackPlan);
+ return testTable.addRollbackCompleted(commitTime, rollbackMetadata, false);
+ }
+
+ private HoodieRollbackPlan getHoodieRollbackPlan(String commitTime,
Map<String, List<String>> partitionFiles) {
+ HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan();
+ List<HoodieRollbackRequest> rollbackRequestList =
partitionFiles.keySet().stream()
+ .map(partition -> new HoodieRollbackRequest(partition, EMPTY_STRING,
EMPTY_STRING,
+ partitionFiles.get(partition), Collections.emptyMap()))
+ .collect(Collectors.toList());
+ rollbackPlan.setRollbackRequests(rollbackRequestList);
+ rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime,
HoodieTimeline.COMMIT_ACTION));
+ return rollbackPlan;
}
public HoodieTestTable doRollbackWithExtraFiles(String commitTimeToRollback,
String commitTime, Map<String, List<String>> extraFiles) throws Exception {
@@ -841,8 +867,10 @@ public class HoodieTestTable {
partitionFiles.get(entry.getKey()).addAll(entry.getValue());
}
}
- HoodieRollbackMetadata rollbackMetadata =
getRollbackMetadata(commitTimeToRollback, partitionFiles);
- return addRollback(commitTime, rollbackMetadata);
+ HoodieRollbackMetadata rollbackMetadata =
getRollbackMetadata(commitTimeToRollback, partitionFiles, false);
+ HoodieRollbackPlan rollbackPlan = getHoodieRollbackPlan(commitTime,
partitionFiles);
+ HoodieTestTable testTable = addRollback(commitTime, rollbackMetadata,
rollbackPlan);
+ return testTable.addRollbackCompleted(commitTime, rollbackMetadata, false);
}
public HoodieTestTable doRestore(String commitToRestoreTo, String
restoreTime) throws Exception {
@@ -857,7 +885,7 @@ public class HoodieTestTable {
}
Map<String, List<String>> partitionFiles =
getPartitionFiles(commitMetadata.get());
rollbackMetadataMap.put(commitInstantToRollback.getTimestamp(),
-
Collections.singletonList(getRollbackMetadata(commitInstantToRollback.getTimestamp(),
partitionFiles)));
+
Collections.singletonList(getRollbackMetadata(commitInstantToRollback.getTimestamp(),
partitionFiles, false)));
for (Map.Entry<String, List<String>> entry : partitionFiles.entrySet()) {
deleteFilesInPartition(entry.getKey(), entry.getValue());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index c4b8f9eb204..ee2f50cb20c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -278,19 +278,18 @@ public class TestStreamWriteOperatorCoordinator {
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(10));
- assertThat(completedTimeline.lastInstant().get().getTimestamp(),
is(instant + "002"));
- assertThat(completedTimeline.lastInstant().get().getAction(),
is(HoodieTimeline.CLEAN_ACTION));
+ assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(9));
// write another commit
mockWriteWithMetadata();
+
// write another commit
instant = mockWriteWithMetadata();
// write another commit to trigger compaction
mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(14));
+ assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(13));
assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(),
is(instant + "001"));
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(),
is(HoodieTimeline.COMMIT_ACTION));
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index 7f624033469..d94f065ee0a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -256,6 +256,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
.withBootstrapParallelism(3)
.withBootstrapModeSelector(bootstrapModeSelectorClass)
.withBootstrapModeForRegexMatch(modeForRegexMatch).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMaxNumDeltaCommitsBeforeCompaction(3).build())
.build();
SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index 6730ba591d6..e1acde4fcd6 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -65,7 +65,6 @@ import static
org.apache.hudi.common.table.HoodieTableMetaClient.reload;
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static
org.apache.hudi.config.HoodieWriteConfig.CLIENT_HEARTBEAT_INTERVAL_IN_MS;
import static
org.apache.hudi.config.HoodieWriteConfig.CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.METADATA_INDEXER_TIME_SUFFIX;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemView;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
@@ -186,7 +185,7 @@ public class TestHoodieIndexer extends
SparkClientFunctionalTestHarness implemen
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(
context(), metadataConfig, metaClient.getBasePathV2().toString());
HoodieTableMetaClient metadataMetaClient =
metadata.getMetadataMetaClient();
- String mdtCommitTime = indexUptoInstantTime + METADATA_INDEXER_TIME_SUFFIX;
+ String mdtCommitTime =
HoodieTableMetadataUtil.createAsyncIndexerTimestamp(indexUptoInstantTime);
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(mdtCommitTime));
// Reverts both instants to inflight state, to simulate inflight indexing
instants