nsivabalan commented on code in PR #8837:
URL: https://github.com/apache/hudi/pull/8837#discussion_r1239143776
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -669,32 +669,51 @@ public void restoreToSavepoint() {
* @param savepointTime Savepoint time to rollback to
*/
public void restoreToSavepoint(String savepointTime) {
- boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled();
- if (initialMetadataTableIfNecessary) {
+ boolean initializeMetadataTableIfNecessary =
config.isMetadataTableEnabled();
+ if (initializeMetadataTableIfNecessary) {
try {
- // Delete metadata table directly when users trigger savepoint
rollback if mdt existed and beforeTimelineStarts
+ // Delete metadata table directly when users trigger savepoint
rollback if mdt existed and if the savePointTime is beforeTimelineStarts
+ // or before the oldest compaction on MDT.
+ // We cannot restore to before the oldest compaction on MDT as we
don't have the basefiles before that time.
String metadataTableBasePathStr =
HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath());
HoodieTableMetaClient mdtClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePathStr).build();
- // Same as HoodieTableMetadataUtil#processRollbackMetadata
+ Option<HoodieInstant> latestMdtCompaction =
mdtClient.getCommitTimeline().filterCompletedInstants().lastInstant();
+ boolean deleteMDT = false;
+ if (latestMdtCompaction.isPresent()) {
+ if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(savepointTime,
latestMdtCompaction.get().getTimestamp())) {
+ LOG.warn(String.format("Deleting MDT during restore to %s as the
savepoint is older than oldest compaction %s on MDT",
+ savepointTime, latestMdtCompaction.get().getTimestamp()));
+ deleteMDT = true;
+ }
+ }
+
HoodieInstant syncedInstant = new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
// The instant required to sync rollback to MDT has been archived and
the mdt syncing will be failed
// So that we need to delete the whole MDT here.
if
(mdtClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp()))
{
+ LOG.warn(String.format("Deleting MDT during restore to %s as the
savepoint is older than the MDT timeline %s",
Review Comment:
we can do a minor optimization here.
```
if (!deleteMDT && ...)
```
bcoz, if previous block already decided to delete MDT, we don't need to
further check other conditions
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -837,10 +840,75 @@ public void update(HoodieCleanMetadata cleanMetadata,
String instantTime) {
*/
@Override
public void update(HoodieRestoreMetadata restoreMetadata, String
instantTime) {
- processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
- metadataMetaClient.getActiveTimeline(), restoreMetadata,
getRecordsGenerationParams(), instantTime,
- metadata.getSyncedInstantTime()), false);
- closeInternal();
+ dataMetaClient.reloadActiveTimeline();
+
+ // Since the restore has completed on the dataset, the latest write
timeline instant is the one to which the
+ // restore was performed. This should be always present.
+ final String restoreToInstantTime =
dataMetaClient.getActiveTimeline().getWriteTimeline()
+ .getReverseOrderedInstants().findFirst().get().getTimestamp();
+
+ // We cannot restore to before the oldest compaction on MDT as we don't
have the basefiles before that time.
+ Option<HoodieInstant> lastCompaction =
metadataMetaClient.getCommitTimeline().filterCompletedInstants().lastInstant();
Review Comment:
is it oldest Compaction or latest Compaction. java docs does not align w/
code.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -837,10 +840,75 @@ public void update(HoodieCleanMetadata cleanMetadata,
String instantTime) {
*/
@Override
public void update(HoodieRestoreMetadata restoreMetadata, String
instantTime) {
- processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
- metadataMetaClient.getActiveTimeline(), restoreMetadata,
getRecordsGenerationParams(), instantTime,
- metadata.getSyncedInstantTime()), false);
- closeInternal();
+ dataMetaClient.reloadActiveTimeline();
+
+ // Since the restore has completed on the dataset, the latest write
timeline instant is the one to which the
+ // restore was performed. This should be always present.
+ final String restoreToInstantTime =
dataMetaClient.getActiveTimeline().getWriteTimeline()
+ .getReverseOrderedInstants().findFirst().get().getTimestamp();
Review Comment:
+1
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -669,32 +669,51 @@ public void restoreToSavepoint() {
* @param savepointTime Savepoint time to rollback to
*/
public void restoreToSavepoint(String savepointTime) {
- boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled();
- if (initialMetadataTableIfNecessary) {
+ boolean initializeMetadataTableIfNecessary =
config.isMetadataTableEnabled();
+ if (initializeMetadataTableIfNecessary) {
try {
- // Delete metadata table directly when users trigger savepoint
rollback if mdt existed and beforeTimelineStarts
+ // Delete metadata table directly when users trigger savepoint
rollback if mdt existed and if the savePointTime is beforeTimelineStarts
+ // or before the oldest compaction on MDT.
+ // We cannot restore to before the oldest compaction on MDT as we
don't have the basefiles before that time.
String metadataTableBasePathStr =
HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath());
HoodieTableMetaClient mdtClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePathStr).build();
- // Same as HoodieTableMetadataUtil#processRollbackMetadata
+ Option<HoodieInstant> latestMdtCompaction =
mdtClient.getCommitTimeline().filterCompletedInstants().lastInstant();
Review Comment:
shouldn't this be earliest/first instant instead of lastInstant? as per the
comments givein in L675 ish
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -669,32 +669,51 @@ public void restoreToSavepoint() {
* @param savepointTime Savepoint time to rollback to
*/
public void restoreToSavepoint(String savepointTime) {
- boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled();
- if (initialMetadataTableIfNecessary) {
+ boolean initializeMetadataTableIfNecessary =
config.isMetadataTableEnabled();
+ if (initializeMetadataTableIfNecessary) {
try {
- // Delete metadata table directly when users trigger savepoint
rollback if mdt existed and beforeTimelineStarts
+ // Delete metadata table directly when users trigger savepoint
rollback if mdt existed and if the savePointTime is beforeTimelineStarts
+ // or before the oldest compaction on MDT.
+ // We cannot restore to before the oldest compaction on MDT as we
don't have the basefiles before that time.
String metadataTableBasePathStr =
HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath());
HoodieTableMetaClient mdtClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePathStr).build();
- // Same as HoodieTableMetadataUtil#processRollbackMetadata
+ Option<HoodieInstant> latestMdtCompaction =
mdtClient.getCommitTimeline().filterCompletedInstants().lastInstant();
+ boolean deleteMDT = false;
+ if (latestMdtCompaction.isPresent()) {
+ if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(savepointTime,
latestMdtCompaction.get().getTimestamp())) {
+ LOG.warn(String.format("Deleting MDT during restore to %s as the
savepoint is older than oldest compaction %s on MDT",
+ savepointTime, latestMdtCompaction.get().getTimestamp()));
+ deleteMDT = true;
+ }
+ }
+
HoodieInstant syncedInstant = new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
// The instant required to sync rollback to MDT has been archived and
the mdt syncing will be failed
// So that we need to delete the whole MDT here.
if
(mdtClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp()))
{
+ LOG.warn(String.format("Deleting MDT during restore to %s as the
savepoint is older than the MDT timeline %s",
+ savepointTime,
mdtClient.getCommitsTimeline().firstInstant().get().getTimestamp()));
+ deleteMDT = true;
+ }
+
+ if (deleteMDT) {
+ // TODO: this should use the correct API to delete from
HoodieTableMetadataUtil so hoodie.properties is also updated
+ // To be fixed after HUDI-6200
Review Comment:
can you take care of this now since we have landed the other patch
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -837,10 +840,75 @@ public void update(HoodieCleanMetadata cleanMetadata,
String instantTime) {
*/
@Override
public void update(HoodieRestoreMetadata restoreMetadata, String
instantTime) {
- processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
- metadataMetaClient.getActiveTimeline(), restoreMetadata,
getRecordsGenerationParams(), instantTime,
- metadata.getSyncedInstantTime()), false);
- closeInternal();
+ dataMetaClient.reloadActiveTimeline();
+
+ // Since the restore has completed on the dataset, the latest write
timeline instant is the one to which the
+ // restore was performed. This should be always present.
+ final String restoreToInstantTime =
dataMetaClient.getActiveTimeline().getWriteTimeline()
+ .getReverseOrderedInstants().findFirst().get().getTimestamp();
+
+ // We cannot restore to before the oldest compaction on MDT as we don't
have the basefiles before that time.
+ Option<HoodieInstant> lastCompaction =
metadataMetaClient.getCommitTimeline().filterCompletedInstants().lastInstant();
+ if (lastCompaction.isPresent()) {
+ if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(restoreToInstantTime,
lastCompaction.get().getTimestamp())) {
+ String msg = String.format("Cannot restore MDT to %s because it is
older than latest compaction at %s", restoreToInstantTime,
+ lastCompaction.get().getTimestamp()) + ". Please delete MDT and
restore again";
+ LOG.error(msg);
+ throw new HoodieMetadataException(msg);
+ }
+ }
+
+ // Restore requires the existing pipelines to be shutdown. So we can
safely scan the dataset to find the current
+ // list of files in the filesystem.
+ List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
+ Map<String, DirectoryInfo> dirInfoMap =
dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath,
Function.identity()));
+ dirInfoList.clear();
+
+ LOG.info("Restoring MDT to " + restoreToInstantTime + " at " +
instantTime);
+ getWriteClient().restoreToInstant(restoreToInstantTime, false);
+
+ // At this point we have also reverted the cleans which have occurred
after the restoreToInstantTime. Hence, a sync
+ // is required to bring back those cleans.
+ try {
+ initTableMetadata();
+ HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata();
+ Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new
HashMap<>();
+ for (String partition : metadata.fetchAllPartitionPaths()) {
+ FileStatus[] metadataFiles = metadata.getAllFilesInPartition(new
Path(dataWriteConfig.getBasePath(), partition));
+ if (!dirInfoMap.containsKey(partition)) {
+ // Entire partition has been deleted
+ List<String> filePaths = Arrays.stream(metadataFiles).map(f ->
f.getPath().getName()).collect(Collectors.toList());
+ HoodieCleanPartitionMetadata cleanPartitionMetadata = new
HoodieCleanPartitionMetadata(partition, "", filePaths, filePaths,
+ Collections.emptyList(), true);
+ partitionMetadata.put(partition, cleanPartitionMetadata);
+ } else {
+ // Some files cleaned in the partition
+ Map<String, Long> fsFiles =
dirInfoMap.get(partition).getFileNameToSizeMap();
+ List<String> filesDeleted = Arrays.stream(metadataFiles).map(f ->
f.getPath().getName())
+ .filter(n ->
!fsFiles.containsKey(n)).collect(Collectors.toList());
+ if (!filesDeleted.isEmpty()) {
+ LOG.info("Found deleted files in partition " + partition + ": " +
filesDeleted);
+ HoodieCleanPartitionMetadata cleanPartitionMetadata = new
HoodieCleanPartitionMetadata(partition, "", filesDeleted, filesDeleted,
+ Collections.EMPTY_LIST, false);
+ partitionMetadata.put(partition, cleanPartitionMetadata);
+ }
+ }
+ }
+ cleanMetadata.setPartitionMetadata(partitionMetadata);
+
+ // Even if we don't have any deleted files to sync, we still create an
empty commit so that we can track the restore has completed.
+ // We cannot create a deltaCommit at instantTime now because a future
block (rollback) has already been written to the logFiles.
+ // We need to choose a timestamp which would be a validInstantTime for
MDT. This is either a commit timestamp completed on the dataset
+ // or a timestamp with suffix which we use for MDT clean, compaction etc.
+ // String syncCommitTime =
HoodieTableMetadataUtil.createIndexInitTimestamp(HoodieActiveTimeline.createNewInstantTime());
+ // TODO: Using METADATA_INDEXER_TIME_SUFFIX for now, but this should
have its own suffix. To be fixed after HUDI-6200
+ String syncCommitTime = HoodieActiveTimeline.createNewInstantTime() +
METADATA_INDEXER_TIME_SUFFIX;
Review Comment:
@codope : gentle ping
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -625,6 +625,9 @@ private static void
processRestoreMetadata(HoodieActiveTimeline metadataTableTim
/**
* Convert rollback action metadata to metadata table records.
+ * <p>
+ * We only need to handle FILES partition here as HUDI rollbacks on MOR
table may end up adding a new log file. All other partitions
+ * are handled by actual rollback of the deltacommit which added records to
those partitions.
Review Comment:
why we do this. Triggering rollback for the MDT will actually add rollback
blocks to all partitions in MDT right. So, no additional handling should be
required in my understanding.
or am I missing something.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -851,26 +919,49 @@ public void update(HoodieRestoreMetadata restoreMetadata,
String instantTime) {
*/
@Override
public void update(HoodieRollbackMetadata rollbackMetadata, String
instantTime) {
- if (enabled && metadata != null) {
- // Is this rollback of an instant that has been synced to the metadata
table?
- String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);
- boolean wasSynced =
metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, rollbackInstant));
- if (!wasSynced) {
- // A compaction may have taken place on metadata table which would
have included this instant being rolled back.
- // Revisit this logic to relax the compaction fencing :
https://issues.apache.org/jira/browse/HUDI-2458
- Option<String> latestCompaction = metadata.getLatestCompactionTime();
- if (latestCompaction.isPresent()) {
- wasSynced = HoodieTimeline.compareTimestamps(rollbackInstant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCompaction.get());
- }
+ // The commit which is being rolled back on the dataset
+ final String commitInstantTime =
rollbackMetadata.getCommitsRollback().get(0);
+ // Find the deltacommits since the last compaction
+ Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+
CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline());
+ if (!deltaCommitsInfo.isPresent()) {
+ LOG.info(String.format("Ignoring rollback of instant %s at %s since
there are no deltacommits on MDT", commitInstantTime, instantTime));
+ return;
+ }
+
+ // This could be a compaction or deltacommit instant (See
CompactionUtils.getDeltaCommitsSinceLatestCompaction)
+ HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue();
+ HoodieTimeline deltacommitsSinceCompaction =
deltaCommitsInfo.get().getKey();
+
+ // The deltacommit that will be rolled back
+ HoodieInstant deltaCommitInstant = new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, commitInstantTime);
+
+ // The commit being rolled back should not be older than the latest
compaction on the MDT. Compaction on MDT only occurs when all actions
+ // are completed on the dataset. Hence, this case implies a rollback of
completed commit which should actually be handled using restore.
+ if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
+ final String compactionInstantTime = compactionInstant.getTimestamp();
+ if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitInstantTime,
compactionInstantTime)) {
+ throw new HoodieMetadataException(String.format("Commit being rolled
back %s is older than the latest compaction %s. "
+ + "There are %d deltacommits after this compaction: %s",
commitInstantTime, compactionInstantTime,
+ deltacommitsSinceCompaction.countInstants(),
deltacommitsSinceCompaction.getInstants()));
}
+ }
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> records =
- HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
metadataMetaClient.getActiveTimeline(),
- rollbackMetadata, getRecordsGenerationParams(), instantTime,
- metadata.getSyncedInstantTime(), wasSynced);
- commit(instantTime, records, false);
- closeInternal();
+ if (deltaCommitsInfo.get().getKey().containsInstant(deltaCommitInstant)) {
+ LOG.info("Rolling back MDT deltacommit " + commitInstantTime);
+ if (!getWriteClient().rollback(commitInstantTime, instantTime)) {
+ throw new HoodieMetadataException("Failed to rollback deltacommit at "
+ commitInstantTime);
+ }
+ } else {
+ LOG.info(String.format("Ignoring rollback of instant %s at %s since
there are no corresponding deltacommits on MDT",
+ commitInstantTime, instantTime));
}
+
+ // Rollback of MOR table may end up adding a new log file. So we need to
check for added files and add them to MDT
+ processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
metadataMetaClient.getActiveTimeline(),
+ rollbackMetadata, getRecordsGenerationParams(), instantTime,
+ metadata.getSyncedInstantTime(), true), false);
Review Comment:
yeah. not sure I understand why we do this. rollback in L952 only adds
rollback to MDT right. so no new files will be added to data table. So, what
exactly are we trying to do here.
Also, w/ this change, we are adding some new instant times to MDT timeline
which may not be present in DT timeline. so, did we ensure we account for these
in validTimestamps with LogRecordReader?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -837,10 +840,75 @@ public void update(HoodieCleanMetadata cleanMetadata,
String instantTime) {
*/
@Override
public void update(HoodieRestoreMetadata restoreMetadata, String
instantTime) {
- processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
- metadataMetaClient.getActiveTimeline(), restoreMetadata,
getRecordsGenerationParams(), instantTime,
- metadata.getSyncedInstantTime()), false);
- closeInternal();
+ dataMetaClient.reloadActiveTimeline();
+
+ // Since the restore has completed on the dataset, the latest write
timeline instant is the one to which the
+ // restore was performed. This should be always present.
+ final String restoreToInstantTime =
dataMetaClient.getActiveTimeline().getWriteTimeline()
+ .getReverseOrderedInstants().findFirst().get().getTimestamp();
+
+ // We cannot restore to before the oldest compaction on MDT as we don't
have the basefiles before that time.
+ Option<HoodieInstant> lastCompaction =
metadataMetaClient.getCommitTimeline().filterCompletedInstants().lastInstant();
+ if (lastCompaction.isPresent()) {
+ if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(restoreToInstantTime,
lastCompaction.get().getTimestamp())) {
+ String msg = String.format("Cannot restore MDT to %s because it is
older than latest compaction at %s", restoreToInstantTime,
+ lastCompaction.get().getTimestamp()) + ". Please delete MDT and
restore again";
+ LOG.error(msg);
+ throw new HoodieMetadataException(msg);
+ }
+ }
+
+ // Restore requires the existing pipelines to be shutdown. So we can
safely scan the dataset to find the current
+ // list of files in the filesystem.
+ List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
+ Map<String, DirectoryInfo> dirInfoMap =
dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath,
Function.identity()));
+ dirInfoList.clear();
+
+ LOG.info("Restoring MDT to " + restoreToInstantTime + " at " +
instantTime);
+ getWriteClient().restoreToInstant(restoreToInstantTime, false);
+
+ // At this point we have also reverted the cleans which have occurred
after the restoreToInstantTime. Hence, a sync
Review Comment:
I mean, specifically where we have cleans in the timeline which is being
restored. I did happen to check the test testTableOperationsWithRestore, but I
don't think it tests this case
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]