This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.11.0 by this push:
new d6ba8d8bb7 Revert "[HUDI-3406] Rollback incorrectly relying on FS
listing instead of Com… (#4957)" (#5321)
d6ba8d8bb7 is described below
commit d6ba8d8bb7d6026ea98bd03e3de87d800513dccb
Author: Raymond Xu <[email protected]>
AuthorDate: Thu Apr 14 02:28:54 2022 -0700
Revert "[HUDI-3406] Rollback incorrectly relying on FS listing instead of
Com… (#4957)" (#5321)
…d of Com… (#4957)"
This reverts commit 98b4e9796e1e3e1f69954afa698ace5b28bde4a0.
Revert this only for release-0.11.0. The patch will be retained in master.
---
.../hudi/client/utils/MetadataConversionUtils.java | 17 +-
.../table/action/rollback/BaseRollbackHelper.java | 5 +
.../rollback/ListingBasedRollbackHelper.java | 150 ++++++++++
.../rollback/ListingBasedRollbackStrategy.java | 302 ++-------------------
.../hudi/table/action/rollback/RollbackUtils.java | 167 ++++++++++++
.../action/rollback/SerializablePathFilter.java | 26 --
.../table/upgrade/ZeroToOneUpgradeHandler.java | 34 ++-
.../TestMergeOnReadRollbackActionExecutor.java | 5 +-
.../hudi/common/model/HoodieCommitMetadata.java | 13 -
9 files changed, 368 insertions(+), 351 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
index 342de74a11..c0405161d8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.utils;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
+
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
@@ -33,7 +34,6 @@ import
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -186,19 +186,6 @@ public class MetadataConversionUtils {
return
Option.of(TimelineMetadataUtils.deserializeRequestedReplaceMetadata(requestedContent.get()));
}
- public static Option<HoodieCommitMetadata>
getHoodieCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant
hoodieInstant) throws IOException {
- HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
- HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
-
- if
(hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- return
Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
- HoodieReplaceCommitMetadata.class));
- }
- return
Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
- HoodieCommitMetadata.class));
-
- }
-
public static org.apache.hudi.avro.model.HoodieCommitMetadata
convertCommitMetadata(
HoodieCommitMetadata hoodieCommitMetadata) {
ObjectMapper mapper = new ObjectMapper();
@@ -213,4 +200,4 @@ public class MetadataConversionUtils {
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY,
"");
return avroMetaData;
}
-}
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
index 8475afe16e..189de373d9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -213,4 +214,8 @@ public class BaseRollbackHelper implements Serializable {
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
return header;
}
+
+ public interface SerializablePathFilter extends PathFilter, Serializable {
+
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
new file mode 100644
index 0000000000..628b2fc372
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+
+/**
+ * Performs Rollback of Hoodie Tables.
+ */
+public class ListingBasedRollbackHelper implements Serializable {
+ private static final Logger LOG =
LogManager.getLogger(ListingBasedRollbackHelper.class);
+
+ private final HoodieTableMetaClient metaClient;
+ private final HoodieWriteConfig config;
+
+ public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient,
HoodieWriteConfig config) {
+ this.metaClient = metaClient;
+ this.config = config;
+ }
+
+ /**
+ * Collects info for Rollback plan.
+ */
+ public List<HoodieRollbackRequest>
getRollbackRequestsForRollbackPlan(HoodieEngineContext context, HoodieInstant
instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
+ int sparkPartitions = Math.max(Math.min(rollbackRequests.size(),
config.getRollbackParallelism()), 1);
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Rollback
Plan");
+ return getListingBasedRollbackRequests(context, instantToRollback,
rollbackRequests, sparkPartitions);
+ }
+
+ /**
+ * May be delete interested files and collect stats or collect stats only.
+ *
+ * @param context instance of {@link HoodieEngineContext} to use.
+ * @param instantToRollback {@link HoodieInstant} of interest for which
deletion or collect stats is requested.
+ * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to
be operated on.
+ * @param numPartitions number of spark partitions to use for
parallelism.
+ * @return stats collected with or w/o actual deletions.
+ */
+ private List<HoodieRollbackRequest>
getListingBasedRollbackRequests(HoodieEngineContext context, HoodieInstant
instantToRollback,
+
List<ListingBasedRollbackRequest> rollbackRequests, int numPartitions) {
+ return context.map(rollbackRequests, rollbackRequest -> {
+ switch (rollbackRequest.getType()) {
+ case DELETE_DATA_FILES_ONLY: {
+ final FileStatus[] filesToDeletedStatus =
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+ rollbackRequest.getPartitionPath(), metaClient.getFs());
+ List<String> filesToBeDeleted =
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+ String fileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
+ }).collect(Collectors.toList());
+ return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(),
+ EMPTY_STRING, EMPTY_STRING, filesToBeDeleted,
Collections.EMPTY_MAP);
+ }
+ case DELETE_DATA_AND_LOG_FILES: {
+ final FileStatus[] filesToDeletedStatus =
getBaseAndLogFilesToBeDeleted(instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath(), metaClient.getFs());
+ List<String> filesToBeDeleted =
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+ String fileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
+ }).collect(Collectors.toList());
+ return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(),
EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP);
+ }
+ case APPEND_ROLLBACK_BLOCK: {
+ String fileId = rollbackRequest.getFileId().get();
+ String latestBaseInstant =
rollbackRequest.getLatestBaseInstant().get();
+ HoodieWriteStat writeStat = rollbackRequest.getWriteStat().get();
+
+ Path fullLogFilePath =
FSUtils.getPartitionPath(config.getBasePath(), writeStat.getPath());
+
+ Map<String, Long> logFilesWithBlocksToRollback =
+ Collections.singletonMap(fullLogFilePath.toString(),
writeStat.getTotalWriteBytes());
+
+ return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(),
fileId, latestBaseInstant,
+ Collections.EMPTY_LIST, logFilesWithBlocksToRollback);
+ }
+ default:
+ throw new IllegalStateException("Unknown Rollback action " +
rollbackRequest);
+ }
+ }, numPartitions);
+ }
+
+ private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieWriteConfig config,
+ String commit, String
partitionPath, FileSystem fs) throws IOException {
+ LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ PathFilter filter = (path) -> {
+ if (path.toString().contains(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(),
partitionPath), filter);
+ }
+
+ private FileStatus[] getBaseAndLogFilesToBeDeleted(String commit, String
partitionPath, FileSystem fs) throws IOException {
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ BaseRollbackHelper.SerializablePathFilter filter = (path) -> {
+ if (path.toString().endsWith(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ } else if (FSUtils.isLogFile(path)) {
+ // Since the baseCommitTime is the only commit for new log files, it's
okay here
+ String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(),
partitionPath), filter);
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index ed37798607..e6355526e5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -18,42 +18,19 @@
package org.apache.hudi.table.action.rollback;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.jetbrains.annotations.NotNull;
-import static
org.apache.hudi.client.utils.MetadataConversionUtils.getHoodieCommitMetadata;
-import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
-import static
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+import java.io.IOException;
+import java.util.List;
/**
* Listing based rollback strategy to fetch list of {@link
HoodieRollbackRequest}s.
@@ -62,15 +39,12 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
private static final Logger LOG =
LogManager.getLogger(ListingBasedRollbackStrategy.class);
- protected final HoodieTable<?, ?, ?, ?> table;
-
- protected final transient HoodieEngineContext context;
-
+ protected final HoodieTable table;
+ protected final HoodieEngineContext context;
protected final HoodieWriteConfig config;
-
protected final String instantTime;
- public ListingBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table,
+ public ListingBasedRollbackStrategy(HoodieTable table,
HoodieEngineContext context,
HoodieWriteConfig config,
String instantTime) {
@@ -83,260 +57,20 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- HoodieTableMetaClient metaClient = table.getMetaClient();
- List<String> partitionPaths =
- FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
- int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
-
- context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
-
- HoodieTableType tableType = table.getMetaClient().getTableType();
- String baseFileExtension = getBaseFileExtension(metaClient);
- Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(metaClient, instantToRollback);
- Boolean isCommitMetadataCompleted =
checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional);
-
- return context.flatMap(partitionPaths, partitionPath -> {
- List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
- FileStatus[] filesToDelete =
- fetchFilesFromInstant(instantToRollback, partitionPath,
metaClient.getBasePath(), baseFileExtension,
- metaClient.getFs(), commitMetadataOptional,
isCommitMetadataCompleted);
-
- if (HoodieTableType.COPY_ON_WRITE == tableType) {
- hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath,
filesToDelete));
- } else if (HoodieTableType.MERGE_ON_READ == tableType) {
- String commit = instantToRollback.getTimestamp();
- HoodieActiveTimeline activeTimeline =
table.getMetaClient().reloadActiveTimeline();
- switch (instantToRollback.getAction()) {
- case HoodieTimeline.COMMIT_ACTION:
- case HoodieTimeline.REPLACE_COMMIT_ACTION:
-
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath,
filesToDelete));
- break;
- case HoodieTimeline.COMPACTION_ACTION:
- // If there is no delta commit present after the current commit
(if compaction), no action, else we
- // need to make sure that a compaction commit rollback also
deletes any log files written as part of the
- // succeeding deltacommit.
- boolean higherDeltaCommits =
-
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
1)
- .empty();
- if (higherDeltaCommits) {
- // Rollback of a compaction action with no higher deltacommit
means that the compaction is scheduled
- // and has not yet finished. In this scenario we should delete
only the newly created base files
- // and not corresponding base commit log files created with
this as baseCommit since updates would
- // have been written to the log files.
-
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath,
- listFilesToBeDeleted(instantToRollback.getTimestamp(),
baseFileExtension, partitionPath,
- metaClient.getFs())));
- } else {
- // No deltacommits present after this compaction commit
(inflight or requested). In this case, we
- // can also delete any log files that were created with this
compaction commit as base
- // commit.
-
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath,
filesToDelete));
- }
- break;
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- //
--------------------------------------------------------------------------------------------------
- // (A) The following cases are possible if
index.canIndexLogFiles and/or index.isGlobal
- //
--------------------------------------------------------------------------------------------------
- // (A.1) Failed first commit - Inserts were written to log files
and HoodieWriteStat has no entries. In
- // this scenario we would want to delete these log files.
- // (A.2) Failed recurring commit - Inserts/Updates written to
log files. In this scenario,
- // HoodieWriteStat will have the baseCommitTime for the first
log file written, add rollback blocks.
- // (A.3) Rollback triggered for first commit - Inserts were
written to the log files but the commit is
- // being reverted. In this scenario, HoodieWriteStat will be
`null` for the attribute prevCommitTime and
- // and hence will end up deleting these log files. This is done
so there are no orphan log files
- // lying around.
- // (A.4) Rollback triggered for recurring commits -
Inserts/Updates are being rolled back, the actions
- // taken in this scenario is a combination of (A.2) and (A.3)
- //
---------------------------------------------------------------------------------------------------
- // (B) The following cases are possible if
!index.canIndexLogFiles and/or !index.isGlobal
- //
---------------------------------------------------------------------------------------------------
- // (B.1) Failed first commit - Inserts were written to base
files and HoodieWriteStat has no entries.
- // In this scenario, we delete all the base files written for
the failed commit.
- // (B.2) Failed recurring commits - Inserts were written to base
files and updates to log files. In
- // this scenario, perform (A.1) and for updates written to log
files, write rollback blocks.
- // (B.3) Rollback triggered for first commit - Same as (B.1)
- // (B.4) Rollback triggered for recurring commits - Same as
(B.2) plus we need to delete the log files
- // as well if the base base file gets deleted.
- HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
-
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
- HoodieCommitMetadata.class);
-
- // In case all data was inserts and the commit failed, delete
the file belonging to that commit
- // We do not know fileIds for inserts (first inserts are either
log files or base files),
- // delete all files for the corresponding failed commit, if
present (same as COW)
-
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath,
filesToDelete));
-
- // append rollback blocks for updates and inserts as A.2 and B.2
- if
(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
- hoodieRollbackRequests.addAll(
- getRollbackRequestToAppend(partitionPath,
instantToRollback, commitMetadata, table));
- }
- break;
- default:
- throw new HoodieRollbackException("Unknown listing type, during
rollback of " + instantToRollback);
- }
- } else {
- throw new HoodieRollbackException(
- String.format("Unsupported table type: %s, during listing
rollback of %s", tableType, instantToRollback));
- }
- return hoodieRollbackRequests.stream();
- }, numPartitions);
- } catch (Exception e) {
+ List<ListingBasedRollbackRequest> rollbackRequests = null;
+ if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
+ table.getMetaClient().getBasePath());
+ } else {
+ rollbackRequests = RollbackUtils
+ .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
+ }
+ List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
+ .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
+ return listingBasedRollbackRequests;
+ } catch (IOException e) {
LOG.error("Generating rollback requests failed for " +
instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed
for " + instantToRollback.getTimestamp(), e);
}
}
-
- private String getBaseFileExtension(HoodieTableMetaClient metaClient) {
- return metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
- }
-
- @NotNull
- private HoodieRollbackRequest getHoodieRollbackRequest(String partitionPath,
FileStatus[] filesToDeletedStatus) {
- List<String> filesToDelete = getFilesToBeDeleted(filesToDeletedStatus);
- return new HoodieRollbackRequest(
- partitionPath, EMPTY_STRING, EMPTY_STRING, filesToDelete,
Collections.emptyMap());
- }
-
- @NotNull
- private List<String> getFilesToBeDeleted(FileStatus[]
dataFilesToDeletedStatus) {
- return Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
- String dataFileToBeDeleted = fileStatus.getPath().toString();
- // strip scheme E.g: file:/var/folders
- return dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") +
1);
- }).collect(Collectors.toList());
- }
-
- private FileStatus[] listFilesToBeDeleted(String commit, String
basefileExtension, String partitionPath,
- FileSystem fs) throws IOException {
- LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
- PathFilter filter = (path) -> {
- if (path.toString().contains(basefileExtension)) {
- String fileCommitTime = FSUtils.getCommitTime(path.getName());
- return commit.equals(fileCommitTime);
- }
- return false;
- };
- return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(),
partitionPath), filter);
- }
-
- private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback,
String partitionPath, String basePath,
- String baseFileExtension,
HoodieWrapperFileSystem fs,
- Option<HoodieCommitMetadata>
commitMetadataOptional,
- Boolean
isCommitMetadataCompleted) throws IOException {
- if (isCommitMetadataCompleted) {
- return fetchFilesFromCommitMetadata(instantToRollback, partitionPath,
basePath, commitMetadataOptional.get(),
- baseFileExtension, fs);
- } else {
- return fetchFilesFromListFiles(instantToRollback, partitionPath,
basePath, baseFileExtension, fs);
- }
- }
-
- private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant
instantToRollback, String partitionPath,
- String basePath,
HoodieCommitMetadata commitMetadata,
- String baseFileExtension,
HoodieWrapperFileSystem fs)
- throws IOException {
- SerializablePathFilter pathFilter =
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
- Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata,
partitionPath);
-
- return fs.listStatus(filePaths, pathFilter);
- }
-
- private FileStatus[] fetchFilesFromListFiles(HoodieInstant
instantToRollback, String partitionPath, String basePath,
- String baseFileExtension,
HoodieWrapperFileSystem fs)
- throws IOException {
- SerializablePathFilter pathFilter =
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
- Path[] filePaths = listFilesToBeDeleted(basePath, partitionPath);
-
- return fs.listStatus(filePaths, pathFilter);
- }
-
- private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback,
- Option<HoodieCommitMetadata>
commitMetadataOptional) {
- return commitMetadataOptional.isPresent() &&
instantToRollback.isCompleted()
- &&
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType());
- }
-
- private static Path[] listFilesToBeDeleted(String basePath, String
partitionPath) {
- return new Path[] {FSUtils.getPartitionPath(basePath, partitionPath)};
- }
-
- private static Path[] getFilesFromCommitMetadata(String basePath,
HoodieCommitMetadata commitMetadata, String partitionPath) {
- List<String> fullPaths =
commitMetadata.getFullPathsByPartitionPath(basePath, partitionPath);
- return fullPaths.stream().map(Path::new).toArray(Path[]::new);
- }
-
- @NotNull
- private static SerializablePathFilter getSerializablePathFilter(String
basefileExtension, String commit) {
- return (path) -> {
- if (path.toString().endsWith(basefileExtension)) {
- String fileCommitTime = FSUtils.getCommitTime(path.getName());
- return commit.equals(fileCommitTime);
- } else if (FSUtils.isLogFile(path)) {
- // Since the baseCommitTime is the only commit for new log files, it's
okay here
- String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
- return commit.equals(fileCommitTime);
- }
- return false;
- };
- }
-
- public static List<HoodieRollbackRequest> getRollbackRequestToAppend(String
partitionPath, HoodieInstant rollbackInstant,
-
HoodieCommitMetadata commitMetadata, HoodieTable<?, ?, ?, ?> table) {
- List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>();
-
checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
-
- // wStat.getPrevCommit() might not give the right commit time in the
following
- // scenario : If a compaction was scheduled, the new commitTime associated
with the requested compaction will be
- // used to write the new log files. In this case, the commit time for the
log file is the compaction requested time.
- // But the index (global) might store the baseCommit of the base and not
the requested, hence get the
- // baseCommit always by listing the file slice
- // With multi writers, rollbacks could be lazy. and so we need to use
getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
- Map<String, FileSlice> latestFileSlices = table.getSliceView()
- .getLatestFileSlicesBeforeOrOn(partitionPath,
rollbackInstant.getTimestamp(), true)
- .collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));
-
- List<HoodieWriteStat> hoodieWriteStats =
commitMetadata.getPartitionToWriteStats().get(partitionPath)
- .stream()
- .filter(writeStat -> {
- // Filter out stats without prevCommit since they are all inserts
- boolean validForRollback = (writeStat != null) &&
(!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
- && (writeStat.getPrevCommit() != null) &&
latestFileSlices.containsKey(writeStat.getFileId());
-
- if (!validForRollback) {
- return false;
- }
-
- FileSlice latestFileSlice =
latestFileSlices.get(writeStat.getFileId());
-
- // For sanity, log-file base-instant time can never be less than
base-commit on which we are rolling back
- checkArgument(
-
HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(),
- HoodieTimeline.LESSER_THAN_OR_EQUALS,
rollbackInstant.getTimestamp()),
- "Log-file base-instant could not be less than the instant being
rolled back");
-
- // Command block "rolling back" the preceding block {@link
HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
- // w/in the latest file-slice is appended iff base-instant of the
log-file is _strictly_ less
- // than the instant of the Delta Commit being rolled back.
Otherwise, log-file will be cleaned up
- // in a different branch of the flow.
- return
HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(),
HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
- })
- .collect(Collectors.toList());
-
- for (HoodieWriteStat writeStat : hoodieWriteStats) {
- FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
- String fileId = writeStat.getFileId();
- String latestBaseInstant = latestFileSlice.getBaseInstantTime();
-
- Path fullLogFilePath =
FSUtils.getPartitionPath(table.getConfig().getBasePath(), writeStat.getPath());
-
- Map<String, Long> logFilesWithBlocksToRollback =
- Collections.singletonMap(fullLogFilePath.toString(),
writeStat.getTotalWriteBytes());
-
- hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath,
fileId, latestBaseInstant,
- Collections.emptyList(), logFilesWithBlocksToRollback));
- }
-
- return hoodieRollbackRequests;
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index ce7a185151..2bc9b59b0d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -21,13 +21,21 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -36,6 +44,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
@@ -91,4 +102,160 @@ public class RollbackUtils {
return new HoodieRollbackStat(stat1.getPartitionPath(),
successDeleteFiles, failedDeleteFiles, commandBlocksCount);
}
+ /**
+ * Generate all rollback requests that needs rolling back this action
without actually performing rollback for COW table type.
+ * @param engineContext instance of {@link HoodieEngineContext} to use.
+ * @param basePath base path of interest.
+ * @return {@link List} of {@link ListingBasedRollbackRequest}s thus
collected.
+ */
+ public static List<ListingBasedRollbackRequest>
generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext, String
basePath) {
+ return FSUtils.getAllPartitionPaths(engineContext, basePath, false,
false).stream()
+
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Generate all rollback requests that we need to perform for rolling back
this action without actually performing rolling back for MOR table type.
+ *
+ * @param instantToRollback Instant to Rollback
+ * @param table instance of {@link HoodieTable} to use.
+ * @param context instance of {@link HoodieEngineContext} to use.
+ * @return list of rollback requests
+ */
+ public static List<ListingBasedRollbackRequest>
generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback,
HoodieTable table, HoodieEngineContext context) throws IOException {
+ String commit = instantToRollback.getTimestamp();
+ HoodieWriteConfig config = table.getConfig();
+ List<String> partitions = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ if (partitions.isEmpty()) {
+ return new ArrayList<>();
+ }
+ int sparkPartitions = Math.max(Math.min(partitions.size(),
config.getRollbackParallelism()), 1);
+ context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all
rollback requests");
+ return context.flatMap(partitions, partitionPath -> {
+ HoodieActiveTimeline activeTimeline =
table.getMetaClient().reloadActiveTimeline();
+ List<ListingBasedRollbackRequest> partitionRollbackRequests = new
ArrayList<>();
+ switch (instantToRollback.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ LOG.info("Rolling back commit action.");
+ partitionRollbackRequests.add(
+
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
+ break;
+ case HoodieTimeline.COMPACTION_ACTION:
+ // If there is no delta commit present after the current commit (if
compaction), no action, else we
+ // need to make sure that a compaction commit rollback also deletes
any log files written as part of the
+ // succeeding deltacommit.
+ boolean higherDeltaCommits =
+
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
1).empty();
+ if (higherDeltaCommits) {
+ // Rollback of a compaction action with no higher deltacommit
means that the compaction is scheduled
+ // and has not yet finished. In this scenario we should delete
only the newly created base files
+ // and not corresponding base commit log files created with this
as baseCommit since updates would
+ // have been written to the log files.
+ LOG.info("Rolling back compaction. There are higher delta commits.
So only deleting data files");
+ partitionRollbackRequests.add(
+
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath));
+ } else {
+ // No deltacommits present after this compaction commit (inflight
or requested). In this case, we
+ // can also delete any log files that were created with this
compaction commit as base
+ // commit.
+ LOG.info("Rolling back compaction plan. There are NO higher delta
commits. So deleting both data and"
+ + " log files");
+ partitionRollbackRequests.add(
+
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
+ }
+ break;
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ //
--------------------------------------------------------------------------------------------------
+ // (A) The following cases are possible if index.canIndexLogFiles
and/or index.isGlobal
+ //
--------------------------------------------------------------------------------------------------
+ // (A.1) Failed first commit - Inserts were written to log files and
HoodieWriteStat has no entries. In
+ // this scenario we would want to delete these log files.
+ // (A.2) Failed recurring commit - Inserts/Updates written to log
files. In this scenario,
+ // HoodieWriteStat will have the baseCommitTime for the first log
file written, add rollback blocks.
+ // (A.3) Rollback triggered for first commit - Inserts were written
to the log files but the commit is
+ // being reverted. In this scenario, HoodieWriteStat will be `null`
for the attribute prevCommitTime and
+ // and hence will end up deleting these log files. This is done so
there are no orphan log files
+ // lying around.
+ // (A.4) Rollback triggered for recurring commits - Inserts/Updates
are being rolled back, the actions
+ // taken in this scenario is a combination of (A.2) and (A.3)
+ //
---------------------------------------------------------------------------------------------------
+ // (B) The following cases are possible if !index.canIndexLogFiles
and/or !index.isGlobal
+ //
---------------------------------------------------------------------------------------------------
+ // (B.1) Failed first commit - Inserts were written to base files
and HoodieWriteStat has no entries.
+ // In this scenario, we delete all the base files written for the
failed commit.
+ // (B.2) Failed recurring commits - Inserts were written to base
files and updates to log files. In
+ // this scenario, perform (A.1) and for updates written to log
files, write rollback blocks.
+ // (B.3) Rollback triggered for first commit - Same as (B.1)
+ // (B.4) Rollback triggered for recurring commits - Same as (B.2)
plus we need to delete the log files
+ // as well if the base base file gets deleted.
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+ // In case all data was inserts and the commit failed, delete the
file belonging to that commit
+ // We do not know fileIds for inserts (first inserts are either log
files or base files),
+ // delete all files for the corresponding failed commit, if present
(same as COW)
+ partitionRollbackRequests.add(
+
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
+
+ // append rollback blocks for updates and inserts as A.2 and B.2
+ if
(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
+ partitionRollbackRequests
+ .addAll(generateAppendRollbackBlocksAction(partitionPath,
instantToRollback, commitMetadata, table));
+ }
+ break;
+ default:
+ break;
+ }
+ return partitionRollbackRequests.stream();
+ }, Math.min(partitions.size(),
sparkPartitions)).stream().filter(Objects::nonNull).collect(Collectors.toList());
+ }
+
+ private static List<ListingBasedRollbackRequest>
generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant
rollbackInstant,
+ HoodieCommitMetadata commitMetadata, HoodieTable table) {
+
checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
+
+ // wStat.getPrevCommit() might not give the right commit time in the
following
+ // scenario : If a compaction was scheduled, the new commitTime associated
with the requested compaction will be
+ // used to write the new log files. In this case, the commit time for the
log file is the compaction requested time.
+ // But the index (global) might store the baseCommit of the base and not
the requested, hence get the
+ // baseCommit always by listing the file slice
+ // With multi writers, rollbacks could be lazy. and so we need to use
getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
+ Map<String, FileSlice> latestFileSlices = table.getSliceView()
+ .getLatestFileSlicesBeforeOrOn(partitionPath,
rollbackInstant.getTimestamp(), true)
+ .collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));
+
+ return commitMetadata.getPartitionToWriteStats().get(partitionPath)
+ .stream()
+ .filter(writeStat -> {
+ // Filter out stats without prevCommit since they are all inserts
+ boolean validForRollback = (writeStat != null) &&
(!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
+ && (writeStat.getPrevCommit() != null) &&
latestFileSlices.containsKey(writeStat.getFileId());
+
+ if (!validForRollback) {
+ return false;
+ }
+
+ FileSlice latestFileSlice =
latestFileSlices.get(writeStat.getFileId());
+
+ // For sanity, log-file base-instant time can never be less than
base-commit on which we are rolling back
+ checkArgument(
+
HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(),
+ HoodieTimeline.LESSER_THAN_OR_EQUALS,
rollbackInstant.getTimestamp()),
+ "Log-file base-instant could not be less than the instant being
rolled back");
+
+ // Command block "rolling back" the preceding block {@link
HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
+ // w/in the latest file-slice is appended iff base-instant of the
log-file is _strictly_ less
+ // than the instant of the Delta Commit being rolled back.
Otherwise, log-file will be cleaned up
+ // in a different branch of the flow.
+ return
HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(),
HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
+ })
+ .map(writeStat -> {
+ FileSlice latestFileSlice =
latestFileSlices.get(writeStat.getFileId());
+ return
ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath,
+ writeStat.getFileId(), latestFileSlice.getBaseInstantTime(),
writeStat);
+ })
+ .collect(Collectors.toList());
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
deleted file mode 100644
index e2affdf5ca..0000000000
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hadoop.fs.PathFilter;
-
-import java.io.Serializable;
-
-public interface SerializablePathFilter extends PathFilter, Serializable {
-}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 42add690f2..6a114154c8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -18,14 +18,14 @@
package org.apache.hudi.table.upgrade;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -35,10 +35,15 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
-import org.apache.hudi.table.action.rollback.ListingBasedRollbackStrategy;
+import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
+import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -95,7 +100,14 @@ public class ZeroToOneUpgradeHandler implements
UpgradeHandler {
writeMarkers.quietDeleteMarkerDir(context, parallelism);
// generate rollback stats
- List<HoodieRollbackStat> rollbackStats =
getListBasedRollBackStats(table, context, commitInstantOpt);
+ List<ListingBasedRollbackRequest> rollbackRequests;
+ if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
table.getMetaClient().getBasePath());
+ } else {
+ rollbackRequests =
RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(),
table, context);
+ }
+ List<HoodieRollbackStat> rollbackStats =
getListBasedRollBackStats(table.getMetaClient(), table.getConfig(),
+ context, commitInstantOpt, rollbackRequests);
// recreate markers adhering to marker based rollback
for (HoodieRollbackStat rollbackStat : rollbackStats) {
@@ -114,12 +126,12 @@ public class ZeroToOneUpgradeHandler implements
UpgradeHandler {
}
}
- List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTable<?, ?, ?, ?>
table, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt) {
- List<HoodieRollbackRequest> hoodieRollbackRequests =
- new ListingBasedRollbackStrategy(table, context, table.getConfig(),
commitInstantOpt.get().getTimestamp())
- .getRollbackRequests(commitInstantOpt.get());
- return new BaseRollbackHelper(table.getMetaClient(), table.getConfig())
- .collectRollbackStats(context, commitInstantOpt.get(),
hoodieRollbackRequests);
+ List<HoodieRollbackStat> getListBasedRollBackStats(
+ HoodieTableMetaClient metaClient, HoodieWriteConfig config,
HoodieEngineContext context,
+ Option<HoodieInstant> commitInstantOpt,
List<ListingBasedRollbackRequest> rollbackRequests) {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ListingBasedRollbackHelper(metaClient, config)
+ .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(),
rollbackRequests);
+ return new BaseRollbackHelper(metaClient,
config).collectRollbackStats(context, commitInstantOpt.get(),
hoodieRollbackRequests);
}
/**
@@ -131,7 +143,7 @@ public class ZeroToOneUpgradeHandler implements
UpgradeHandler {
* @param table {@link HoodieTable} instance to use
* @return the marker file name thus curated.
*/
- private static String getFileNameForMarkerFromLogFile(String logFilePath,
HoodieTable<?, ?, ?, ?> table) {
+ private static String getFileNameForMarkerFromLogFile(String logFilePath,
HoodieTable table) {
Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath);
String fileId = FSUtils.getFileIdFromLogPath(logPath);
String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index d8ce6612a4..c9e3fed871 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -45,6 +45,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -124,8 +125,8 @@ public class TestMergeOnReadRollbackActionExecutor extends
HoodieClientRollbackT
for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry :
rollbackMetadata.entrySet()) {
HoodieRollbackPartitionMetadata meta = entry.getValue();
- assertEquals(0, meta.getFailedDeleteFiles().size());
- assertEquals(0, meta.getSuccessDeleteFiles().size());
+ assertTrue(meta.getFailedDeleteFiles() == null ||
meta.getFailedDeleteFiles().size() == 0);
+ assertTrue(meta.getSuccessDeleteFiles() == null ||
meta.getSuccessDeleteFiles().size() == 0);
}
//4. assert file group after rollback, and compare to the rollbackstat
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 53ceb00409..40bc3c8280 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -138,19 +138,6 @@ public class HoodieCommitMetadata implements Serializable {
return fullPaths;
}
- public List<String> getFullPathsByPartitionPath(String basePath, String
partitionPath) {
- HashSet<String> fullPaths = new HashSet<>();
- if (getPartitionToWriteStats().get(partitionPath) != null) {
- for (HoodieWriteStat stat :
getPartitionToWriteStats().get(partitionPath)) {
- if ((stat.getFileId() != null)) {
- String fullPath = FSUtils.getPartitionPath(basePath,
stat.getPath()).toString();
- fullPaths.add(fullPath);
- }
- }
- }
- return new ArrayList<>(fullPaths);
- }
-
public Map<HoodieFileGroupId, String> getFileGroupIdAndFullPaths(String
basePath) {
Map<HoodieFileGroupId, String> fileGroupIdToFullPaths = new HashMap<>();
for (Map.Entry<String, List<HoodieWriteStat>> entry :
getPartitionToWriteStats().entrySet()) {