nsivabalan commented on code in PR #9553:
URL: https://github.com/apache/hudi/pull/9553#discussion_r1313397918
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -95,7 +124,7 @@ public List<HoodieRollbackStat>
collectRollbackStats(HoodieEngineContext context
// stack trace:
https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
// related stack overflow post:
https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as
GenericData.Array.
List<SerializableHoodieRollbackRequest> serializableRequests =
rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
- return context.reduceByKey(maybeDeleteAndCollectStats(context,
instantToRollback, serializableRequests, false, parallelism),
+ return context.reduceByKey(maybeDeleteAndCollectStats(context,
instantTime, instantToRollback, serializableRequests, false, parallelism, new
HashSet<>()),
Review Comment:
this method is used only in 0 to 1 upgrade handler. So, we don't need to
worry about log files from previous failed rollbacks.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java:
##########
@@ -75,64 +82,115 @@ public List<HoodieRollbackRequest>
getRollbackRequests(HoodieInstant instantToRo
List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
table, context, instantToRollback.getTimestamp(),
config.getRollbackParallelism());
int parallelism = Math.max(Math.min(markerPaths.size(),
config.getRollbackParallelism()), 1);
- return context.map(markerPaths, markerFilePath -> {
+ List<HoodieRollbackRequest> rollbackRequests = context.map(markerPaths,
markerFilePath -> {
String typeStr =
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
+ String fileNameWithPartitionToRollback =
WriteMarkers.stripMarkerSuffix(markerFilePath);
+ Path fullFilePathToRollback = new Path(basePath,
fileNameWithPartitionToRollback);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullFilePathToRollback.getParent());
switch (type) {
case MERGE:
case CREATE:
- String fileToDelete =
WriteMarkers.stripMarkerSuffix(markerFilePath);
- Path fullDeletePath = new Path(basePath, fileToDelete);
- String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullDeletePath.getParent());
- return new HoodieRollbackRequest(partitionPath, EMPTY_STRING,
EMPTY_STRING,
- Collections.singletonList(fullDeletePath.toString()),
+ String fileId = null;
+ String baseInstantTime = null;
+ if (FSUtils.isBaseFile(fullFilePathToRollback)) {
+ HoodieBaseFile baseFileToDelete = new
HoodieBaseFile(fullFilePathToRollback.toString());
+ fileId = baseFileToDelete.getFileId();
+ baseInstantTime = baseFileToDelete.getCommitTime();
+ } else if (FSUtils.isLogFile(fullFilePathToRollback)) {
+ // TODO: HUDI-1517 may distinguish log file created from log
file being appended in the future @guanziyue
+ // Now it should not have create type
+ checkArgument(type != IOType.CREATE, "Log file should not
support create io type now");
+ checkArgument(type != IOType.MERGE, "Log file should not support
merge io type");
+ HoodieLogFile logFileToDelete = new
HoodieLogFile(fullFilePathToRollback.toString());
+ fileId = logFileToDelete.getFileId();
+ baseInstantTime = logFileToDelete.getBaseCommitTime();
+ }
+ Objects.requireNonNull(fileId, "Cannot find valid fileId from
path: " + fullFilePathToRollback);
+ Objects.requireNonNull(baseInstantTime, "Cannot find valid base
instant from path: " + fullFilePathToRollback);
+ return new HoodieRollbackRequest(partitionPath, fileId,
baseInstantTime,
+ Collections.singletonList(fullFilePathToRollback.toString()),
Collections.emptyMap());
case APPEND:
- // NOTE: This marker file-path does NOT correspond to a log-file,
but rather is a phony
- // path serving as a "container" for the following
components:
- // - Base file's file-id
- // - Base file's commit instant
- // - Partition path
- return getRollbackRequestForAppend(instantToRollback,
WriteMarkers.stripMarkerSuffix(markerFilePath));
+ HoodieRollbackRequest rollbackRequestForAppend =
getRollbackRequestForAppend(instantToRollback, fileNameWithPartitionToRollback);
+ return rollbackRequestForAppend;
default:
throw new HoodieRollbackException("Unknown marker type, during
rollback of " + instantToRollback);
}
}, parallelism);
+ // we need to ensure we return one rollback request per partition path,
fileId, baseInstant triplet.
+ List<HoodieRollbackRequest> processedRollbackRequests =
context.parallelize(rollbackRequests)
+ .mapToPair(rollbackRequest ->
Pair.of(Pair.of(rollbackRequest.getPartitionPath(),
rollbackRequest.getFileId()), rollbackRequest))
+ .reduceByKey((SerializableBiFunction<HoodieRollbackRequest,
HoodieRollbackRequest, HoodieRollbackRequest>) (hoodieRollbackRequest,
hoodieRollbackRequest2)
+ -> RollbackUtils.mergeRollbackRequest(hoodieRollbackRequest,
hoodieRollbackRequest2), parallelism).values().collectAsList();
+
+ return processedRollbackRequests;
} catch (Exception e) {
throw new HoodieRollbackException("Error rolling back using marker files
written for " + instantToRollback, e);
}
}
- protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant
instantToRollback, String markerFilePath) throws IOException {
- Path baseFilePathForAppend = new Path(basePath, markerFilePath);
- String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
- String baseCommitTime =
FSUtils.getCommitTime(baseFilePathForAppend.getName());
- String relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), baseFilePathForAppend.getParent());
- Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(),
relativePartitionPath);
-
- // NOTE: Since we're rolling back incomplete Delta Commit, it only could
have appended its
- // block to the latest log-file
- // TODO(HUDI-1517) use provided marker-file's path instead
- Option<HoodieLogFile> latestLogFileOption =
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
- HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
-
- // Log file can be deleted if the commit to rollback is also the commit
that created the fileGroup
- if (latestLogFileOption.isPresent() &&
baseCommitTime.equals(instantToRollback.getTimestamp())) {
- Path fullDeletePath = new Path(partitionPath,
latestLogFileOption.get().getFileName());
- return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING,
EMPTY_STRING,
- Collections.singletonList(fullDeletePath.toString()),
- Collections.emptyMap());
- }
-
- Map<String, Long> logFilesWithBlocsToRollback = new HashMap<>();
- if (latestLogFileOption.isPresent()) {
- HoodieLogFile latestLogFile = latestLogFileOption.get();
+ protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant
instantToRollback, String fileNameWithPartitionToRollback) throws IOException {
+ Path filePath = new Path(basePath, fileNameWithPartitionToRollback);
+ String fileId;
+ String baseCommitTime;
+ String relativePartitionPath;
+ Option<HoodieLogFile> latestLogFileOption;
+
+ // Old marker files may be generated from base file name before HUDI-1517.
keep compatible with them.
+ // TODO: deprecated in HUDI-1517, may be removed in the future.
@guanziyue.gzy
+
+ Map<String, Long> logFilesWithBlocksToRollback = new HashMap<>();
+ if (FSUtils.isBaseFile(filePath)) {
+ LOG.warn("Find old marker type for log file: " +
fileNameWithPartitionToRollback);
+ fileId = FSUtils.getFileIdFromFilePath(filePath);
+ baseCommitTime = FSUtils.getCommitTime(filePath.getName());
+ relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), filePath.getParent());
+ Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(),
relativePartitionPath);
+
+ // NOTE: Since we're rolling back incomplete Delta Commit, it only could
have appended its
+ // block to the latest log-file
+ try {
+ latestLogFileOption =
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
+ HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
+ if (latestLogFileOption.isPresent() &&
baseCommitTime.equals(instantToRollback.getTimestamp())) {
+ HoodieLogFile latestLogFile = latestLogFileOption.get();
+ // NOTE: Marker's don't carry information about the cumulative size
of the blocks that have been appended,
+ // therefore we simply stub this value.
+ FileSystem fileSystem = table.getMetaClient().getFs();
+ List<Option<FileStatus>> fileStatuses =
FSUtils.getFileStatusesUnderPartition(fileSystem, filePath.getParent(),
Collections.singletonList(filePath.getName()), true);
Review Comment:
I have re-written some code snippets in here.
we will avoid doing fs calls per log file.
once we get hold of all rollbackRequests, we will merge them so that we will
have 1 per file slice atleast
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -59,18 +71,20 @@ public class BaseRollbackHelper implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(BaseRollbackHelper.class);
protected static final String EMPTY_STRING = "";
+ protected final HoodieTable table;
protected final HoodieTableMetaClient metaClient;
protected final HoodieWriteConfig config;
- public BaseRollbackHelper(HoodieTableMetaClient metaClient,
HoodieWriteConfig config) {
- this.metaClient = metaClient;
+ public BaseRollbackHelper(HoodieTable table, HoodieWriteConfig config) {
+ this.table = table;
+ this.metaClient = table.getMetaClient();
this.config = config;
}
/**
* Performs all rollback actions that we have collected in parallel.
*/
- public List<HoodieRollbackStat> performRollback(HoodieEngineContext context,
HoodieInstant instantToRollback,
+ public List<HoodieRollbackStat> performRollback(HoodieEngineContext context,
String instantTime, HoodieInstant instantToRollback,
Review Comment:
this is an optimization.
lets say,
t5.dc
added 1000 log files and crashed.
now, we will have 1000 log file markers.
we trigger rollback using t6.rb
during rollback execution, we need to poll markers to find if there were
any log files from previous rollback attempts. If we add it under t5, we might
have to parse 1000s of log files, compare it w/ in-memory rollbackStats
(logFilesFromFailedCommit), and then deduce there is none.
instead if we add markers under (t6), its much simpler. In most cases, it
would return empty list
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java:
##########
@@ -25,69 +25,74 @@
*/
public enum StorageSchemes {
// Local filesystem
- FILE("file", false, false, true),
+ FILE("file", false, false, true, false),
// Hadoop File System
- HDFS("hdfs", true, false, true),
+ HDFS("hdfs", true, false, true, true),
// Baidu Advanced File System
- AFS("afs", true, null, null),
+ AFS("afs", true, null, null, null),
// Mapr File System
- MAPRFS("maprfs", true, null, null),
+ MAPRFS("maprfs", true, null, null, null),
// Apache Ignite FS
- IGNITE("igfs", true, null, null),
+ IGNITE("igfs", true, null, null, null),
// AWS S3
- S3A("s3a", false, true, null), S3("s3", false, true, null),
+ S3A("s3a", false, true, null, false), S3("s3", false, true, null, false),
// Google Cloud Storage
- GCS("gs", false, true, null),
+ GCS("gs", false, true, null, null),
// Azure WASB
- WASB("wasb", false, null, null), WASBS("wasbs", false, null, null),
+ WASB("wasb", false, null, null, null), WASBS("wasbs", false, null, null,
null),
// Azure ADLS
- ADL("adl", false, null, null),
+ ADL("adl", false, null, null, null),
// Azure ADLS Gen2
- ABFS("abfs", false, null, null), ABFSS("abfss", false, null, null),
+ ABFS("abfs", false, null, null, null), ABFSS("abfss", false, null, null,
null),
// Aliyun OSS
- OSS("oss", false, null, null),
+ OSS("oss", false, null, null, null),
// View FS for federated setups. If federating across cloud stores, then
append support is false
// View FS support atomic creation
- VIEWFS("viewfs", true, null, true),
+ VIEWFS("viewfs", true, null, true, null),
//ALLUXIO
- ALLUXIO("alluxio", false, null, null),
+ ALLUXIO("alluxio", false, null, null, null),
// Tencent Cloud Object Storage
- COSN("cosn", false, null, null),
+ COSN("cosn", false, null, null, null),
// Tencent Cloud HDFS
- CHDFS("ofs", true, null, null),
+ CHDFS("ofs", true, null, null, null),
// Tencent Cloud CacheFileSystem
- GOOSEFS("gfs", false, null, null),
+ GOOSEFS("gfs", false, null, null, null),
// Databricks file system
- DBFS("dbfs", false, null, null),
+ DBFS("dbfs", false, null, null, null),
// IBM Cloud Object Storage
- COS("cos", false, null, null),
+ COS("cos", false, null, null, null),
// Huawei Cloud Object Storage
- OBS("obs", false, null, null),
+ OBS("obs", false, null, null, null),
// Kingsoft Standard Storage ks3
- KS3("ks3", false, null, null),
+ KS3("ks3", false, null, null, null),
// JuiceFileSystem
- JFS("jfs", true, null, null),
+ JFS("jfs", true, null, null, null),
// Baidu Object Storage
- BOS("bos", false, null, null),
+ BOS("bos", false, null, null, null),
// Oracle Cloud Infrastructure Object Storage
- OCI("oci", false, null, null),
+ OCI("oci", false, null, null, null),
// Volcengine Object Storage
- TOS("tos", false, null, null),
+ TOS("tos", false, null, null, null),
// Volcengine Cloud HDFS
- CFS("cfs", true, null, null);
+ CFS("cfs", true, null, null, null);
private String scheme;
private boolean supportsAppend;
// null for uncertain if write is transactional, please update this for each
FS
private Boolean isWriteTransactional;
// null for uncertain if dfs support atomic create&delete, please update
this for each FS
private Boolean supportAtomicCreation;
+ // list files may bring pressure to storage with centralized meta service
like HDFS.
+ // when we want to get only part of files under a directory rather than all
files, use getStatus may be more friendly than listStatus.
+ // here is a trade-off between rpc times and throughput of storage meta
service
+ private Boolean listStatusUnfriendly;
Review Comment:
yes. this is to assist when, we really needed to fetch some file status for
a subset of files for a given path, we could have diff implementation.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -276,4 +283,27 @@ protected static Option<IndexedRecord>
toAvroRecord(HoodieRecord record, Schema
return Option.empty();
}
}
+
+ protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback
{
Review Comment:
this is a generic call back used for write handle. in reality, it is used
only for append handle. but the call back is applicable for write handle in
general. So the callback resides here. while the call back impl is within
Append handle.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -79,14 +93,28 @@ public List<HoodieRollbackStat>
performRollback(HoodieEngineContext context, Hoo
// stack trace:
https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
// related stack overflow post:
https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as
GenericData.Array.
List<SerializableHoodieRollbackRequest> serializableRequests =
rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
- return context.reduceByKey(maybeDeleteAndCollectStats(context,
instantToRollback, serializableRequests, true, parallelism),
- RollbackUtils::mergeRollbackStat, parallelism);
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+
+ // Considering rollback may failed before, which generated some additional
log files. We need to add these log files back.
+ Set<String> logPaths = new HashSet<>();
+ try {
+ logPaths = markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism());
+ } catch (FileNotFoundException fnf) {
+ LOG.warn("Rollback never failed and hence no marker dir was found.
Safely moving on");
+ } catch (IOException e) {
+ throw new HoodieRollbackException("Failed to list log file markers for
previous attempt of rollback ", e);
+ }
+
+ List<Pair<String, HoodieRollbackStat>> getRollbackStats =
maybeDeleteAndCollectStats(context, instantTime, instantToRollback,
serializableRequests, true, parallelism,
+ logPaths);
+ List<HoodieRollbackStat> mergedRollbackStatByPartitionPath =
context.reduceByKey(getRollbackStats, RollbackUtils::mergeRollbackStat,
parallelism);
Review Comment:
this is nothing new. we already do reduceByKey to ensure we have one
rollback stat per partition
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]