alexeykudinkin commented on a change in pull request #4957:
URL: https://github.com/apache/hudi/pull/4957#discussion_r831683651
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ List<ListingRollbackActionType> types =
getListingRollbackType(instantToRollback, table, partitionPath);
+ for (ListingRollbackActionType type : types) {
+ switch (type) {
+ case DELETE_DATA_FILES_ONLY:
+ final FileStatus[] dataFilesToDeletedStatus =
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+ partitionPath, metaClient.getFs());
+ List<String> dataFilesToBeDeleted =
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+ String dataFileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new
HoodieRollbackRequest(partitionPath,
+ EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case DELETE_DATA_AND_LOG_FILES:
+ final FileStatus[] filesToDeletedStatus =
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+ List<String> filesToBeDeleted =
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+ String fileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":")
+ 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new HoodieRollbackRequest(
+ partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case APPEND_ROLLBACK_BLOCK:
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath,
instantToRollback, commitMetadata, table));
+ break;
+ default:
+ break;
+ }
+ }
+ return hoodieRollbackRequests.stream();
+ }, numPartitions);
+ } catch (Exception e) {
LOG.error("Generating rollback requests failed for " +
instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed
for " + instantToRollback.getTimestamp(), e);
}
}
+
+ private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieWriteConfig config,
+ String commit, String
partitionPath, FileSystem fs) throws IOException {
+ LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ PathFilter filter = (path) -> {
+ if (path.toString().contains(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(),
partitionPath), filter);
+ }
+
+ private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieInstant instantToRollback, String partitionPath) throws
IOException {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+ Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(timeline, instantToRollback);
+ SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient,
instantToRollback.getTimestamp());
+ Path[] filePaths;
+ if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+ &&
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
{
+ filePaths = getFilesFromCommitMetadata(metaClient,
commitMetadataOptional.get(), partitionPath);
+ } else {
+ filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+ }
+
+ return metaClient.getFs().listStatus(filePaths, pathFilter);
+ }
+
+ private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient,
String partitionPath) {
Review comment:
If we're just accessing the base-path there's no need to pass whole
`metaClient`, right?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -18,33 +18,60 @@
package org.apache.hudi.table.action.rollback;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
-
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.client.utils.MetadataConversionUtils.getHoodieCommitMetadata;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
/**
* Listing based rollback strategy to fetch list of {@link
HoodieRollbackRequest}s.
*/
-public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecutor.RollbackStrategy {
+public class ListingBasedRollbackStrategy<T extends HoodieRecordPayload, I, K,
O> implements BaseRollbackPlanActionExecutor.RollbackStrategy {
Review comment:
This should not be necessary
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ List<ListingRollbackActionType> types =
getListingRollbackType(instantToRollback, table, partitionPath);
+ for (ListingRollbackActionType type : types) {
+ switch (type) {
+ case DELETE_DATA_FILES_ONLY:
+ final FileStatus[] dataFilesToDeletedStatus =
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+ partitionPath, metaClient.getFs());
+ List<String> dataFilesToBeDeleted =
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+ String dataFileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
Review comment:
Let's extract this to a method and handle the case when there's no
scheme in the path
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ List<ListingRollbackActionType> types =
getListingRollbackType(instantToRollback, table, partitionPath);
+ for (ListingRollbackActionType type : types) {
+ switch (type) {
+ case DELETE_DATA_FILES_ONLY:
+ final FileStatus[] dataFilesToDeletedStatus =
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+ partitionPath, metaClient.getFs());
+ List<String> dataFilesToBeDeleted =
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+ String dataFileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new
HoodieRollbackRequest(partitionPath,
+ EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case DELETE_DATA_AND_LOG_FILES:
+ final FileStatus[] filesToDeletedStatus =
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+ List<String> filesToBeDeleted =
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+ String fileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":")
+ 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new HoodieRollbackRequest(
+ partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case APPEND_ROLLBACK_BLOCK:
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath,
instantToRollback, commitMetadata, table));
+ break;
+ default:
+ break;
+ }
+ }
+ return hoodieRollbackRequests.stream();
+ }, numPartitions);
+ } catch (Exception e) {
LOG.error("Generating rollback requests failed for " +
instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed
for " + instantToRollback.getTimestamp(), e);
}
}
+
+ private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieWriteConfig config,
+ String commit, String
partitionPath, FileSystem fs) throws IOException {
+ LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ PathFilter filter = (path) -> {
+ if (path.toString().contains(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(),
partitionPath), filter);
+ }
+
+ private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieInstant instantToRollback, String partitionPath) throws
IOException {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+ Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(timeline, instantToRollback);
+ SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient,
instantToRollback.getTimestamp());
+ Path[] filePaths;
+ if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+ &&
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
{
+ filePaths = getFilesFromCommitMetadata(metaClient,
commitMetadataOptional.get(), partitionPath);
+ } else {
+ filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+ }
+
+ return metaClient.getFs().listStatus(filePaths, pathFilter);
+ }
+
+ private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient,
String partitionPath) {
+ return new Path[]{FSUtils.getPartitionPath(metaClient.getBasePath(),
partitionPath)};
+ }
+
+ private static Path[] getFilesFromCommitMetadata(HoodieTableMetaClient
metaClient, HoodieCommitMetadata commitMetadata, String partitionPath) {
Review comment:
Same as above
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ List<ListingRollbackActionType> types =
getListingRollbackType(instantToRollback, table, partitionPath);
+ for (ListingRollbackActionType type : types) {
+ switch (type) {
+ case DELETE_DATA_FILES_ONLY:
+ final FileStatus[] dataFilesToDeletedStatus =
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+ partitionPath, metaClient.getFs());
+ List<String> dataFilesToBeDeleted =
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+ String dataFileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new
HoodieRollbackRequest(partitionPath,
+ EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case DELETE_DATA_AND_LOG_FILES:
+ final FileStatus[] filesToDeletedStatus =
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+ List<String> filesToBeDeleted =
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+ String fileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":")
+ 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new HoodieRollbackRequest(
+ partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case APPEND_ROLLBACK_BLOCK:
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath,
instantToRollback, commitMetadata, table));
+ break;
+ default:
+ break;
+ }
+ }
+ return hoodieRollbackRequests.stream();
+ }, numPartitions);
+ } catch (Exception e) {
LOG.error("Generating rollback requests failed for " +
instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed
for " + instantToRollback.getTimestamp(), e);
}
}
+
+ private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieWriteConfig config,
+ String commit, String
partitionPath, FileSystem fs) throws IOException {
+ LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ PathFilter filter = (path) -> {
+ if (path.toString().contains(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(),
partitionPath), filter);
+ }
+
+ private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieInstant instantToRollback, String partitionPath) throws
IOException {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+ Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(timeline, instantToRollback);
+ SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient,
instantToRollback.getTimestamp());
+ Path[] filePaths;
+ if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+ &&
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
{
+ filePaths = getFilesFromCommitMetadata(metaClient,
commitMetadataOptional.get(), partitionPath);
+ } else {
+ filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+ }
+
+ return metaClient.getFs().listStatus(filePaths, pathFilter);
+ }
+
+ private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient,
String partitionPath) {
+ return new Path[]{FSUtils.getPartitionPath(metaClient.getBasePath(),
partitionPath)};
+ }
+
+ private static Path[] getFilesFromCommitMetadata(HoodieTableMetaClient
metaClient, HoodieCommitMetadata commitMetadata, String partitionPath) {
+ List<String> fullPaths =
commitMetadata.getFullPathsByPartitionPath(metaClient.getBasePath(),
partitionPath);
+ return fullPaths.stream().map(Path::new).toArray(Path[]::new);
+ }
+
+ @NotNull
+ private static SerializablePathFilter
getSerializablePathFilter(HoodieTableMetaClient metaClient, String commit) {
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ return (path) -> {
+ if (path.toString().endsWith(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ } else if (FSUtils.isLogFile(path)) {
+ // Since the baseCommitTime is the only commit for new log files, it's
okay here
+ String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ }
+
+ private static List<ListingRollbackActionType>
getListingRollbackType(HoodieInstant instantToRollback,
+
HoodieTable<?, ?, ?, ?> table, String partitionPath) throws IOException {
+ List<ListingRollbackActionType> listingRollbackActionTypes = new
ArrayList<>(2);
+ if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE)
{
+
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
+ } else {
+ String commit = instantToRollback.getTimestamp();
+ HoodieActiveTimeline activeTimeline =
table.getMetaClient().reloadActiveTimeline();
+ switch (instantToRollback.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
Review comment:
Please ditto in all branches
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ List<ListingRollbackActionType> types =
getListingRollbackType(instantToRollback, table, partitionPath);
Review comment:
We will need to untangle this a little bit -- i don't think it's
possible for a single commit to write both base files _and_ log files, hence
rollback should only be handling either of this.
cc @nsivabalan
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ List<ListingRollbackActionType> types =
getListingRollbackType(instantToRollback, table, partitionPath);
+ for (ListingRollbackActionType type : types) {
+ switch (type) {
+ case DELETE_DATA_FILES_ONLY:
+ final FileStatus[] dataFilesToDeletedStatus =
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+ partitionPath, metaClient.getFs());
+ List<String> dataFilesToBeDeleted =
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+ String dataFileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new
HoodieRollbackRequest(partitionPath,
+ EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case DELETE_DATA_AND_LOG_FILES:
+ final FileStatus[] filesToDeletedStatus =
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+ List<String> filesToBeDeleted =
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+ String fileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":")
+ 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new HoodieRollbackRequest(
+ partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case APPEND_ROLLBACK_BLOCK:
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath,
instantToRollback, commitMetadata, table));
+ break;
+ default:
+ break;
+ }
+ }
+ return hoodieRollbackRequests.stream();
+ }, numPartitions);
+ } catch (Exception e) {
LOG.error("Generating rollback requests failed for " +
instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed
for " + instantToRollback.getTimestamp(), e);
}
}
+
+ private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieWriteConfig config,
+ String commit, String
partitionPath, FileSystem fs) throws IOException {
+ LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ PathFilter filter = (path) -> {
+ if (path.toString().contains(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(),
partitionPath), filter);
+ }
+
+ private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieInstant instantToRollback, String partitionPath) throws
IOException {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+ Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(timeline, instantToRollback);
+ SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient,
instantToRollback.getTimestamp());
+ Path[] filePaths;
+ if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+ &&
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
{
+ filePaths = getFilesFromCommitMetadata(metaClient,
commitMetadataOptional.get(), partitionPath);
+ } else {
+ filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+ }
+
+ return metaClient.getFs().listStatus(filePaths, pathFilter);
+ }
+
+ private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient,
String partitionPath) {
+ return new Path[]{FSUtils.getPartitionPath(metaClient.getBasePath(),
partitionPath)};
+ }
+
+ private static Path[] getFilesFromCommitMetadata(HoodieTableMetaClient
metaClient, HoodieCommitMetadata commitMetadata, String partitionPath) {
+ List<String> fullPaths =
commitMetadata.getFullPathsByPartitionPath(metaClient.getBasePath(),
partitionPath);
+ return fullPaths.stream().map(Path::new).toArray(Path[]::new);
+ }
+
+ @NotNull
+ private static SerializablePathFilter
getSerializablePathFilter(HoodieTableMetaClient metaClient, String commit) {
Review comment:
Please do holistic eval of all parameters being passed and let's make
sure that we pass only necessary payload
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ List<ListingRollbackActionType> types =
getListingRollbackType(instantToRollback, table, partitionPath);
+ for (ListingRollbackActionType type : types) {
+ switch (type) {
+ case DELETE_DATA_FILES_ONLY:
+ final FileStatus[] dataFilesToDeletedStatus =
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+ partitionPath, metaClient.getFs());
+ List<String> dataFilesToBeDeleted =
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+ String dataFileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new
HoodieRollbackRequest(partitionPath,
+ EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case DELETE_DATA_AND_LOG_FILES:
+ final FileStatus[] filesToDeletedStatus =
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+ List<String> filesToBeDeleted =
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+ String fileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":")
+ 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new HoodieRollbackRequest(
+ partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case APPEND_ROLLBACK_BLOCK:
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath,
instantToRollback, commitMetadata, table));
+ break;
+ default:
+ break;
+ }
+ }
+ return hoodieRollbackRequests.stream();
+ }, numPartitions);
+ } catch (Exception e) {
LOG.error("Generating rollback requests failed for " +
instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed
for " + instantToRollback.getTimestamp(), e);
}
}
+
+ private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieWriteConfig config,
+ String commit, String
partitionPath, FileSystem fs) throws IOException {
+ LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ PathFilter filter = (path) -> {
+ if (path.toString().contains(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(),
partitionPath), filter);
+ }
+
+ private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieInstant instantToRollback, String partitionPath) throws
IOException {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+ Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(timeline, instantToRollback);
+ SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient,
instantToRollback.getTimestamp());
+ Path[] filePaths;
+ if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+ &&
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
{
+ filePaths = getFilesFromCommitMetadata(metaClient,
commitMetadataOptional.get(), partitionPath);
+ } else {
+ filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+ }
+
+ return metaClient.getFs().listStatus(filePaths, pathFilter);
+ }
+
+ private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient,
String partitionPath) {
+ return new Path[]{FSUtils.getPartitionPath(metaClient.getBasePath(),
partitionPath)};
+ }
+
+ private static Path[] getFilesFromCommitMetadata(HoodieTableMetaClient
metaClient, HoodieCommitMetadata commitMetadata, String partitionPath) {
+ List<String> fullPaths =
commitMetadata.getFullPathsByPartitionPath(metaClient.getBasePath(),
partitionPath);
+ return fullPaths.stream().map(Path::new).toArray(Path[]::new);
+ }
+
+ @NotNull
+ private static SerializablePathFilter
getSerializablePathFilter(HoodieTableMetaClient metaClient, String commit) {
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ return (path) -> {
+ if (path.toString().endsWith(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ } else if (FSUtils.isLogFile(path)) {
+ // Since the baseCommitTime is the only commit for new log files, it's
okay here
+ String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ }
+
+ private static List<ListingRollbackActionType>
getListingRollbackType(HoodieInstant instantToRollback,
+
HoodieTable<?, ?, ?, ?> table, String partitionPath) throws IOException {
+ List<ListingRollbackActionType> listingRollbackActionTypes = new
ArrayList<>(2);
+ if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE)
{
+
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
+ } else {
+ String commit = instantToRollback.getTimestamp();
+ HoodieActiveTimeline activeTimeline =
table.getMetaClient().reloadActiveTimeline();
+ switch (instantToRollback.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
Review comment:
Let's simplify control flow and just return here
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +84,242 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ List<ListingRollbackActionType> types =
getListingRollbackType(instantToRollback, table, partitionPath);
+ for (ListingRollbackActionType type : types) {
+ switch (type) {
+ case DELETE_DATA_FILES_ONLY:
+ final FileStatus[] dataFilesToDeletedStatus =
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+ partitionPath, metaClient.getFs());
+ List<String> dataFilesToBeDeleted =
Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+ String dataFileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return
dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new
HoodieRollbackRequest(partitionPath,
+ EMPTY_STRING, EMPTY_STRING, dataFilesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case DELETE_DATA_AND_LOG_FILES:
+ final FileStatus[] filesToDeletedStatus =
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+ List<String> filesToBeDeleted =
Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+ String fileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":")
+ 1);
+ }).collect(Collectors.toList());
+ hoodieRollbackRequests.add(new HoodieRollbackRequest(
+ partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted,
Collections.emptyMap()));
+ break;
+ case APPEND_ROLLBACK_BLOCK:
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+
hoodieRollbackRequests.addAll(getRollbackRequestForAppend(partitionPath,
instantToRollback, commitMetadata, table));
+ break;
+ default:
+ break;
+ }
+ }
+ return hoodieRollbackRequests.stream();
+ }, numPartitions);
+ } catch (Exception e) {
LOG.error("Generating rollback requests failed for " +
instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed
for " + instantToRollback.getTimestamp(), e);
}
}
+
+ private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieWriteConfig config,
+ String commit, String
partitionPath, FileSystem fs) throws IOException {
+ LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ PathFilter filter = (path) -> {
+ if (path.toString().contains(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(),
partitionPath), filter);
+ }
+
+ private FileStatus[] getBaseAndLogFilesToBeDeleted(HoodieTableMetaClient
metaClient, HoodieInstant instantToRollback, String partitionPath) throws
IOException {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+ Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(timeline, instantToRollback);
+ SerializablePathFilter pathFilter = getSerializablePathFilter(metaClient,
instantToRollback.getTimestamp());
+ Path[] filePaths;
+ if (commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
+ &&
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()))
{
+ filePaths = getFilesFromCommitMetadata(metaClient,
commitMetadataOptional.get(), partitionPath);
+ } else {
+ filePaths = listFilesToBeDeleted(metaClient, partitionPath);
+ }
+
+ return metaClient.getFs().listStatus(filePaths, pathFilter);
+ }
+
+ private static Path[] listFilesToBeDeleted(HoodieTableMetaClient metaClient,
String partitionPath) {
+ return new Path[]{FSUtils.getPartitionPath(metaClient.getBasePath(),
partitionPath)};
+ }
+
+ private static Path[] getFilesFromCommitMetadata(HoodieTableMetaClient
metaClient, HoodieCommitMetadata commitMetadata, String partitionPath) {
+ List<String> fullPaths =
commitMetadata.getFullPathsByPartitionPath(metaClient.getBasePath(),
partitionPath);
+ return fullPaths.stream().map(Path::new).toArray(Path[]::new);
+ }
+
+ @NotNull
+ private static SerializablePathFilter
getSerializablePathFilter(HoodieTableMetaClient metaClient, String commit) {
+ String basefileExtension =
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ return (path) -> {
+ if (path.toString().endsWith(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ } else if (FSUtils.isLogFile(path)) {
+ // Since the baseCommitTime is the only commit for new log files, it's
okay here
+ String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ }
+
+ private static List<ListingRollbackActionType>
getListingRollbackType(HoodieInstant instantToRollback,
+
HoodieTable<?, ?, ?, ?> table, String partitionPath) throws IOException {
+ List<ListingRollbackActionType> listingRollbackActionTypes = new
ArrayList<>(2);
+ if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE)
{
+
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
+ } else {
+ String commit = instantToRollback.getTimestamp();
+ HoodieActiveTimeline activeTimeline =
table.getMetaClient().reloadActiveTimeline();
+ switch (instantToRollback.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
+ break;
+ case HoodieTimeline.COMPACTION_ACTION:
+ // If there is no delta commit present after the current commit (if
compaction), no action, else we
+ // need to make sure that a compaction commit rollback also deletes
any log files written as part of the
+ // succeeding deltacommit.
+ boolean higherDeltaCommits =
+
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
1).empty();
+ if (higherDeltaCommits) {
+ // Rollback of a compaction action with no higher deltacommit
means that the compaction is scheduled
+ // and has not yet finished. In this scenario we should delete
only the newly created base files
+ // and not corresponding base commit log files created with this
as baseCommit since updates would
+ // have been written to the log files.
+
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_FILES_ONLY);
+ } else {
+ // No deltacommits present after this compaction commit (inflight
or requested). In this case, we
+ // can also delete any log files that were created with this
compaction commit as base
+ // commit.
+
listingRollbackActionTypes.add(ListingRollbackActionType.DELETE_DATA_AND_LOG_FILES);
+ }
+ break;
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ //
--------------------------------------------------------------------------------------------------
+ // (A) The following cases are possible if index.canIndexLogFiles
and/or index.isGlobal
Review comment:
Note to self: we need to review and update this comments as they seem to
be out of sync with the code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]