This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch HUDI-3406-revert in repository https://gitbox.apache.org/repos/asf/hudi.git
commit de4d5aa429f0aaaf1bb86123238fd40c69e49d85 Author: Raymond Xu <[email protected]> AuthorDate: Thu Apr 14 17:03:46 2022 +0800 Revert "[HUDI-3406] Rollback incorrectly relying on FS listing instead of Com… (#4957)" This reverts commit 98b4e9796e1e3e1f69954afa698ace5b28bde4a0. --- .../hudi/client/utils/MetadataConversionUtils.java | 17 +- .../table/action/rollback/BaseRollbackHelper.java | 5 + .../rollback/ListingBasedRollbackHelper.java | 150 ++++++++++ .../rollback/ListingBasedRollbackStrategy.java | 302 ++------------------- .../hudi/table/action/rollback/RollbackUtils.java | 167 ++++++++++++ .../action/rollback/SerializablePathFilter.java | 26 -- .../table/upgrade/ZeroToOneUpgradeHandler.java | 34 ++- .../TestMergeOnReadRollbackActionExecutor.java | 5 +- .../hudi/common/model/HoodieCommitMetadata.java | 13 - 9 files changed, 368 insertions(+), 351 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java index 342de74a11..c0405161d8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java @@ -21,6 +21,7 @@ package org.apache.hudi.client.utils; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; + import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; @@ -33,7 +34,6 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieRollingStatMetadata; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -186,19 +186,6 @@ public class MetadataConversionUtils { return Option.of(TimelineMetadataUtils.deserializeRequestedReplaceMetadata(requestedContent.get())); } - public static Option<HoodieCommitMetadata> getHoodieCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant hoodieInstant) throws IOException { - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); - - if (hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { - return Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(), - HoodieReplaceCommitMetadata.class)); - } - return Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(), - HoodieCommitMetadata.class)); - - } - public static org.apache.hudi.avro.model.HoodieCommitMetadata convertCommitMetadata( HoodieCommitMetadata hoodieCommitMetadata) { ObjectMapper mapper = new ObjectMapper(); @@ -213,4 +200,4 @@ public class MetadataConversionUtils { avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, ""); return avroMetaData; } -} +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java index 8475afe16e..189de373d9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java @@ -20,6 +20,7 @@ package org.apache.hudi.table.action.rollback; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -213,4 +214,8 @@ public class BaseRollbackHelper implements Serializable { String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); return header; } + + public interface SerializablePathFilter extends PathFilter, Serializable { + + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java new file mode 100644 index 0000000000..628b2fc372 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hudi.avro.model.HoodieRollbackRequest; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING; + +/** + * Performs Rollback of Hoodie Tables. + */ +public class ListingBasedRollbackHelper implements Serializable { + private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class); + + private final HoodieTableMetaClient metaClient; + private final HoodieWriteConfig config; + + public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { + this.metaClient = metaClient; + this.config = config; + } + + /** + * Collects info for Rollback plan. + */ + public List<HoodieRollbackRequest> getRollbackRequestsForRollbackPlan(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) { + int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); + context.setJobStatus(this.getClass().getSimpleName(), "Creating Rollback Plan"); + return getListingBasedRollbackRequests(context, instantToRollback, rollbackRequests, sparkPartitions); + } + + /** + * May be delete interested files and collect stats or collect stats only. + * + * @param context instance of {@link HoodieEngineContext} to use. + * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested. + * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on. + * @param numPartitions number of spark partitions to use for parallelism. + * @return stats collected with or w/o actual deletions. + */ + private List<HoodieRollbackRequest> getListingBasedRollbackRequests(HoodieEngineContext context, HoodieInstant instantToRollback, + List<ListingBasedRollbackRequest> rollbackRequests, int numPartitions) { + return context.map(rollbackRequests, rollbackRequest -> { + switch (rollbackRequest.getType()) { + case DELETE_DATA_FILES_ONLY: { + final FileStatus[] filesToDeletedStatus = getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(), + rollbackRequest.getPartitionPath(), metaClient.getFs()); + List<String> filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> { + String fileToBeDeleted = fileStatus.getPath().toString(); + // strip scheme + return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1); + }).collect(Collectors.toList()); + return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), + EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP); + } + case DELETE_DATA_AND_LOG_FILES: { + final FileStatus[] filesToDeletedStatus = getBaseAndLogFilesToBeDeleted(instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), metaClient.getFs()); + List<String> filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> { + String fileToBeDeleted = fileStatus.getPath().toString(); + // strip scheme + return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1); + }).collect(Collectors.toList()); + return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP); + } + case APPEND_ROLLBACK_BLOCK: { + String fileId = rollbackRequest.getFileId().get(); + String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get(); + HoodieWriteStat writeStat = rollbackRequest.getWriteStat().get(); + + Path fullLogFilePath = FSUtils.getPartitionPath(config.getBasePath(), writeStat.getPath()); + + Map<String, Long> logFilesWithBlocksToRollback = + Collections.singletonMap(fullLogFilePath.toString(), writeStat.getTotalWriteBytes()); + + return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), fileId, latestBaseInstant, + Collections.EMPTY_LIST, logFilesWithBlocksToRollback); + } + default: + throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); + } + }, numPartitions); + } + + private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient metaClient, HoodieWriteConfig config, + String commit, String partitionPath, FileSystem fs) throws IOException { + LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit); + String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + PathFilter filter = (path) -> { + if (path.toString().contains(basefileExtension)) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commit.equals(fileCommitTime); + } + return false; + }; + return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); + } + + private FileStatus[] getBaseAndLogFilesToBeDeleted(String commit, String partitionPath, FileSystem fs) throws IOException { + String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + BaseRollbackHelper.SerializablePathFilter filter = (path) -> { + if (path.toString().endsWith(basefileExtension)) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commit.equals(fileCommitTime); + } else if (FSUtils.isLogFile(path)) { + // Since the baseCommitTime is the only commit for new log files, it's okay here + String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); + return commit.equals(fileCommitTime); + } + return false; + }; + return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java index ed37798607..e6355526e5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java @@ -18,42 +18,19 @@ package org.apache.hudi.table.action.rollback; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.jetbrains.annotations.NotNull; -import static org.apache.hudi.client.utils.MetadataConversionUtils.getHoodieCommitMetadata; -import static org.apache.hudi.common.util.ValidationUtils.checkArgument; -import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING; +import java.io.IOException; +import java.util.List; /** * Listing based rollback strategy to fetch list of {@link HoodieRollbackRequest}s. @@ -62,15 +39,12 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackStrategy.class); - protected final HoodieTable<?, ?, ?, ?> table; - - protected final transient HoodieEngineContext context; - + protected final HoodieTable table; + protected final HoodieEngineContext context; protected final HoodieWriteConfig config; - protected final String instantTime; - public ListingBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table, + public ListingBasedRollbackStrategy(HoodieTable table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) { @@ -83,260 +57,20 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu @Override public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRollback) { try { - HoodieTableMetaClient metaClient = table.getMetaClient(); - List<String> partitionPaths = - FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false); - int numPartitions = Math.max(Math.min(partitionPaths.size(), config.getRollbackParallelism()), 1); - - context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan"); - - HoodieTableType tableType = table.getMetaClient().getTableType(); - String baseFileExtension = getBaseFileExtension(metaClient); - Option<HoodieCommitMetadata> commitMetadataOptional = getHoodieCommitMetadata(metaClient, instantToRollback); - Boolean isCommitMetadataCompleted = checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional); - - return context.flatMap(partitionPaths, partitionPath -> { - List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>(partitionPaths.size()); - FileStatus[] filesToDelete = - fetchFilesFromInstant(instantToRollback, partitionPath, metaClient.getBasePath(), baseFileExtension, - metaClient.getFs(), commitMetadataOptional, isCommitMetadataCompleted); - - if (HoodieTableType.COPY_ON_WRITE == tableType) { - hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete)); - } else if (HoodieTableType.MERGE_ON_READ == tableType) { - String commit = instantToRollback.getTimestamp(); - HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline(); - switch (instantToRollback.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.REPLACE_COMMIT_ACTION: - hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete)); - break; - case HoodieTimeline.COMPACTION_ACTION: - // If there is no delta commit present after the current commit (if compaction), no action, else we - // need to make sure that a compaction commit rollback also deletes any log files written as part of the - // succeeding deltacommit. - boolean higherDeltaCommits = - !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1) - .empty(); - if (higherDeltaCommits) { - // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled - // and has not yet finished. In this scenario we should delete only the newly created base files - // and not corresponding base commit log files created with this as baseCommit since updates would - // have been written to the log files. - hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, - listFilesToBeDeleted(instantToRollback.getTimestamp(), baseFileExtension, partitionPath, - metaClient.getFs()))); - } else { - // No deltacommits present after this compaction commit (inflight or requested). In this case, we - // can also delete any log files that were created with this compaction commit as base - // commit. - hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete)); - } - break; - case HoodieTimeline.DELTA_COMMIT_ACTION: - // -------------------------------------------------------------------------------------------------- - // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal - // -------------------------------------------------------------------------------------------------- - // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In - // this scenario we would want to delete these log files. - // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, - // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. - // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is - // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and - // and hence will end up deleting these log files. This is done so there are no orphan log files - // lying around. - // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions - // taken in this scenario is a combination of (A.2) and (A.3) - // --------------------------------------------------------------------------------------------------- - // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal - // --------------------------------------------------------------------------------------------------- - // (B.1) Failed first commit - Inserts were written to base files and HoodieWriteStat has no entries. - // In this scenario, we delete all the base files written for the failed commit. - // (B.2) Failed recurring commits - Inserts were written to base files and updates to log files. In - // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. - // (B.3) Rollback triggered for first commit - Same as (B.1) - // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files - // as well if the base base file gets deleted. - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(), - HoodieCommitMetadata.class); - - // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or base files), - // delete all files for the corresponding failed commit, if present (same as COW) - hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete)); - - // append rollback blocks for updates and inserts as A.2 and B.2 - if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - hoodieRollbackRequests.addAll( - getRollbackRequestToAppend(partitionPath, instantToRollback, commitMetadata, table)); - } - break; - default: - throw new HoodieRollbackException("Unknown listing type, during rollback of " + instantToRollback); - } - } else { - throw new HoodieRollbackException( - String.format("Unsupported table type: %s, during listing rollback of %s", tableType, instantToRollback)); - } - return hoodieRollbackRequests.stream(); - }, numPartitions); - } catch (Exception e) { + List<ListingBasedRollbackRequest> rollbackRequests = null; + if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { + rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, + table.getMetaClient().getBasePath()); + } else { + rollbackRequests = RollbackUtils + .generateRollbackRequestsUsingFileListingMOR(instantToRollback, table, context); + } + List<HoodieRollbackRequest> listingBasedRollbackRequests = new ListingBasedRollbackHelper(table.getMetaClient(), config) + .getRollbackRequestsForRollbackPlan(context, instantToRollback, rollbackRequests); + return listingBasedRollbackRequests; + } catch (IOException e) { LOG.error("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e); throw new HoodieRollbackException("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e); } } - - private String getBaseFileExtension(HoodieTableMetaClient metaClient) { - return metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); - } - - @NotNull - private HoodieRollbackRequest getHoodieRollbackRequest(String partitionPath, FileStatus[] filesToDeletedStatus) { - List<String> filesToDelete = getFilesToBeDeleted(filesToDeletedStatus); - return new HoodieRollbackRequest( - partitionPath, EMPTY_STRING, EMPTY_STRING, filesToDelete, Collections.emptyMap()); - } - - @NotNull - private List<String> getFilesToBeDeleted(FileStatus[] dataFilesToDeletedStatus) { - return Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> { - String dataFileToBeDeleted = fileStatus.getPath().toString(); - // strip scheme E.g: file:/var/folders - return dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1); - }).collect(Collectors.toList()); - } - - private FileStatus[] listFilesToBeDeleted(String commit, String basefileExtension, String partitionPath, - FileSystem fs) throws IOException { - LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit); - PathFilter filter = (path) -> { - if (path.toString().contains(basefileExtension)) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commit.equals(fileCommitTime); - } - return false; - }; - return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); - } - - private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, String partitionPath, String basePath, - String baseFileExtension, HoodieWrapperFileSystem fs, - Option<HoodieCommitMetadata> commitMetadataOptional, - Boolean isCommitMetadataCompleted) throws IOException { - if (isCommitMetadataCompleted) { - return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, basePath, commitMetadataOptional.get(), - baseFileExtension, fs); - } else { - return fetchFilesFromListFiles(instantToRollback, partitionPath, basePath, baseFileExtension, fs); - } - } - - private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant instantToRollback, String partitionPath, - String basePath, HoodieCommitMetadata commitMetadata, - String baseFileExtension, HoodieWrapperFileSystem fs) - throws IOException { - SerializablePathFilter pathFilter = getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp()); - Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath); - - return fs.listStatus(filePaths, pathFilter); - } - - private FileStatus[] fetchFilesFromListFiles(HoodieInstant instantToRollback, String partitionPath, String basePath, - String baseFileExtension, HoodieWrapperFileSystem fs) - throws IOException { - SerializablePathFilter pathFilter = getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp()); - Path[] filePaths = listFilesToBeDeleted(basePath, partitionPath); - - return fs.listStatus(filePaths, pathFilter); - } - - private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback, - Option<HoodieCommitMetadata> commitMetadataOptional) { - return commitMetadataOptional.isPresent() && instantToRollback.isCompleted() - && !WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()); - } - - private static Path[] listFilesToBeDeleted(String basePath, String partitionPath) { - return new Path[] {FSUtils.getPartitionPath(basePath, partitionPath)}; - } - - private static Path[] getFilesFromCommitMetadata(String basePath, HoodieCommitMetadata commitMetadata, String partitionPath) { - List<String> fullPaths = commitMetadata.getFullPathsByPartitionPath(basePath, partitionPath); - return fullPaths.stream().map(Path::new).toArray(Path[]::new); - } - - @NotNull - private static SerializablePathFilter getSerializablePathFilter(String basefileExtension, String commit) { - return (path) -> { - if (path.toString().endsWith(basefileExtension)) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commit.equals(fileCommitTime); - } else if (FSUtils.isLogFile(path)) { - // Since the baseCommitTime is the only commit for new log files, it's okay here - String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); - return commit.equals(fileCommitTime); - } - return false; - }; - } - - public static List<HoodieRollbackRequest> getRollbackRequestToAppend(String partitionPath, HoodieInstant rollbackInstant, - HoodieCommitMetadata commitMetadata, HoodieTable<?, ?, ?, ?> table) { - List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>(); - checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); - - // wStat.getPrevCommit() might not give the right commit time in the following - // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be - // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. - // But the index (global) might store the baseCommit of the base and not the requested, hence get the - // baseCommit always by listing the file slice - // With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices() - Map<String, FileSlice> latestFileSlices = table.getSliceView() - .getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), true) - .collect(Collectors.toMap(FileSlice::getFileId, Function.identity())); - - List<HoodieWriteStat> hoodieWriteStats = commitMetadata.getPartitionToWriteStats().get(partitionPath) - .stream() - .filter(writeStat -> { - // Filter out stats without prevCommit since they are all inserts - boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) - && (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId()); - - if (!validForRollback) { - return false; - } - - FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); - - // For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back - checkArgument( - HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), - HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()), - "Log-file base-instant could not be less than the instant being rolled back"); - - // Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK} - // w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less - // than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up - // in a different branch of the flow. - return HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); - }) - .collect(Collectors.toList()); - - for (HoodieWriteStat writeStat : hoodieWriteStats) { - FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); - String fileId = writeStat.getFileId(); - String latestBaseInstant = latestFileSlice.getBaseInstantTime(); - - Path fullLogFilePath = FSUtils.getPartitionPath(table.getConfig().getBasePath(), writeStat.getPath()); - - Map<String, Long> logFilesWithBlocksToRollback = - Collections.singletonMap(fullLogFilePath.toString(), writeStat.getTotalWriteBytes()); - - hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, fileId, latestBaseInstant, - Collections.emptyList(), logFilesWithBlocksToRollback)); - } - - return hoodieRollbackRequests; - } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index ce7a185151..2bc9b59b0d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -21,13 +21,21 @@ package org.apache.hudi.table.action.rollback; import org.apache.hadoop.fs.FileStatus; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -36,6 +44,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; @@ -91,4 +102,160 @@ public class RollbackUtils { return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount); } + /** + * Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type. + * @param engineContext instance of {@link HoodieEngineContext} to use. + * @param basePath base path of interest. + * @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected. + */ + public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext, String basePath) { + return FSUtils.getAllPartitionPaths(engineContext, basePath, false, false).stream() + .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction) + .collect(Collectors.toList()); + } + + /** + * Generate all rollback requests that we need to perform for rolling back this action without actually performing rolling back for MOR table type. + * + * @param instantToRollback Instant to Rollback + * @param table instance of {@link HoodieTable} to use. + * @param context instance of {@link HoodieEngineContext} to use. + * @return list of rollback requests + */ + public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException { + String commit = instantToRollback.getTimestamp(); + HoodieWriteConfig config = table.getConfig(); + List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false); + if (partitions.isEmpty()) { + return new ArrayList<>(); + } + int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); + context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all rollback requests"); + return context.flatMap(partitions, partitionPath -> { + HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline(); + List<ListingBasedRollbackRequest> partitionRollbackRequests = new ArrayList<>(); + switch (instantToRollback.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.REPLACE_COMMIT_ACTION: + LOG.info("Rolling back commit action."); + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); + break; + case HoodieTimeline.COMPACTION_ACTION: + // If there is no delta commit present after the current commit (if compaction), no action, else we + // need to make sure that a compaction commit rollback also deletes any log files written as part of the + // succeeding deltacommit. + boolean higherDeltaCommits = + !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty(); + if (higherDeltaCommits) { + // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled + // and has not yet finished. In this scenario we should delete only the newly created base files + // and not corresponding base commit log files created with this as baseCommit since updates would + // have been written to the log files. + LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath)); + } else { + // No deltacommits present after this compaction commit (inflight or requested). In this case, we + // can also delete any log files that were created with this compaction commit as base + // commit. + LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" + + " log files"); + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); + } + break; + case HoodieTimeline.DELTA_COMMIT_ACTION: + // -------------------------------------------------------------------------------------------------- + // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal + // -------------------------------------------------------------------------------------------------- + // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In + // this scenario we would want to delete these log files. + // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, + // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. + // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is + // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and + // and hence will end up deleting these log files. This is done so there are no orphan log files + // lying around. + // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions + // taken in this scenario is a combination of (A.2) and (A.3) + // --------------------------------------------------------------------------------------------------- + // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal + // --------------------------------------------------------------------------------------------------- + // (B.1) Failed first commit - Inserts were written to base files and HoodieWriteStat has no entries. + // In this scenario, we delete all the base files written for the failed commit. + // (B.2) Failed recurring commits - Inserts were written to base files and updates to log files. In + // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. + // (B.3) Rollback triggered for first commit - Same as (B.1) + // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files + // as well if the base base file gets deleted. + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(), + HoodieCommitMetadata.class); + + // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or base files), + // delete all files for the corresponding failed commit, if present (same as COW) + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); + + // append rollback blocks for updates and inserts as A.2 and B.2 + if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + partitionRollbackRequests + .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table)); + } + break; + default: + break; + } + return partitionRollbackRequests.stream(); + }, Math.min(partitions.size(), sparkPartitions)).stream().filter(Objects::nonNull).collect(Collectors.toList()); + } + + private static List<ListingBasedRollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, + HoodieCommitMetadata commitMetadata, HoodieTable table) { + checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); + + // wStat.getPrevCommit() might not give the right commit time in the following + // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be + // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. + // But the index (global) might store the baseCommit of the base and not the requested, hence get the + // baseCommit always by listing the file slice + // With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices() + Map<String, FileSlice> latestFileSlices = table.getSliceView() + .getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), true) + .collect(Collectors.toMap(FileSlice::getFileId, Function.identity())); + + return commitMetadata.getPartitionToWriteStats().get(partitionPath) + .stream() + .filter(writeStat -> { + // Filter out stats without prevCommit since they are all inserts + boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) + && (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId()); + + if (!validForRollback) { + return false; + } + + FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); + + // For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back + checkArgument( + HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), + HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()), + "Log-file base-instant could not be less than the instant being rolled back"); + + // Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK} + // w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less + // than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up + // in a different branch of the flow. + return HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); + }) + .map(writeStat -> { + FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); + return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, + writeStat.getFileId(), latestFileSlice.getBaseInstantTime(), writeStat); + }) + .collect(Collectors.toList()); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java deleted file mode 100644 index e2affdf5ca..0000000000 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hadoop.fs.PathFilter; - -import java.io.Serializable; - -public interface SerializablePathFilter extends PathFilter, Serializable { -} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 42add690f2..6a114154c8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -18,14 +18,14 @@ package org.apache.hudi.table.upgrade; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -35,10 +35,15 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.rollback.BaseRollbackHelper; -import org.apache.hudi.table.action.rollback.ListingBasedRollbackStrategy; +import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; +import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; +import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -95,7 +100,14 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler { writeMarkers.quietDeleteMarkerDir(context, parallelism); // generate rollback stats - List<HoodieRollbackStat> rollbackStats = getListBasedRollBackStats(table, context, commitInstantOpt); + List<ListingBasedRollbackRequest> rollbackRequests; + if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { + rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath()); + } else { + rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context); + } + List<HoodieRollbackStat> rollbackStats = getListBasedRollBackStats(table.getMetaClient(), table.getConfig(), + context, commitInstantOpt, rollbackRequests); // recreate markers adhering to marker based rollback for (HoodieRollbackStat rollbackStat : rollbackStats) { @@ -114,12 +126,12 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler { } } - List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTable<?, ?, ?, ?> table, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt) { - List<HoodieRollbackRequest> hoodieRollbackRequests = - new ListingBasedRollbackStrategy(table, context, table.getConfig(), commitInstantOpt.get().getTimestamp()) - .getRollbackRequests(commitInstantOpt.get()); - return new BaseRollbackHelper(table.getMetaClient(), table.getConfig()) - .collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests); + List<HoodieRollbackStat> getListBasedRollBackStats( + HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, + Option<HoodieInstant> commitInstantOpt, List<ListingBasedRollbackRequest> rollbackRequests) { + List<HoodieRollbackRequest> hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config) + .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests); + return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests); } /** @@ -131,7 +143,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler { * @param table {@link HoodieTable} instance to use * @return the marker file name thus curated. */ - private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable<?, ?, ?, ?> table) { + private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) { Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath); String fileId = FSUtils.getFileIdFromLogPath(logPath); String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index d8ce6612a4..c9e3fed871 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -45,6 +45,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.testutils.MetadataMergeWriteStatus; + import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -124,8 +125,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : rollbackMetadata.entrySet()) { HoodieRollbackPartitionMetadata meta = entry.getValue(); - assertEquals(0, meta.getFailedDeleteFiles().size()); - assertEquals(0, meta.getSuccessDeleteFiles().size()); + assertTrue(meta.getFailedDeleteFiles() == null || meta.getFailedDeleteFiles().size() == 0); + assertTrue(meta.getSuccessDeleteFiles() == null || meta.getSuccessDeleteFiles().size() == 0); } //4. assert file group after rollback, and compare to the rollbackstat diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index 53ceb00409..40bc3c8280 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -138,19 +138,6 @@ public class HoodieCommitMetadata implements Serializable { return fullPaths; } - public List<String> getFullPathsByPartitionPath(String basePath, String partitionPath) { - HashSet<String> fullPaths = new HashSet<>(); - if (getPartitionToWriteStats().get(partitionPath) != null) { - for (HoodieWriteStat stat : getPartitionToWriteStats().get(partitionPath)) { - if ((stat.getFileId() != null)) { - String fullPath = FSUtils.getPartitionPath(basePath, stat.getPath()).toString(); - fullPaths.add(fullPath); - } - } - } - return new ArrayList<>(fullPaths); - } - public Map<HoodieFileGroupId, String> getFileGroupIdAndFullPaths(String basePath) { Map<HoodieFileGroupId, String> fileGroupIdToFullPaths = new HashMap<>(); for (Map.Entry<String, List<HoodieWriteStat>> entry : getPartitionToWriteStats().entrySet()) {
