guanziyue commented on code in PR #9553:
URL: https://github.com/apache/hudi/pull/9553#discussion_r1307822743
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java:
##########
@@ -75,64 +82,115 @@ public List<HoodieRollbackRequest>
getRollbackRequests(HoodieInstant instantToRo
List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
table, context, instantToRollback.getTimestamp(),
config.getRollbackParallelism());
int parallelism = Math.max(Math.min(markerPaths.size(),
config.getRollbackParallelism()), 1);
- return context.map(markerPaths, markerFilePath -> {
+ List<HoodieRollbackRequest> rollbackRequests = context.map(markerPaths,
markerFilePath -> {
String typeStr =
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
+ String fileNameWithPartitionToRollback =
WriteMarkers.stripMarkerSuffix(markerFilePath);
+ Path fullFilePathToRollback = new Path(basePath,
fileNameWithPartitionToRollback);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullFilePathToRollback.getParent());
switch (type) {
case MERGE:
case CREATE:
- String fileToDelete =
WriteMarkers.stripMarkerSuffix(markerFilePath);
- Path fullDeletePath = new Path(basePath, fileToDelete);
- String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullDeletePath.getParent());
- return new HoodieRollbackRequest(partitionPath, EMPTY_STRING,
EMPTY_STRING,
- Collections.singletonList(fullDeletePath.toString()),
+ String fileId = null;
+ String baseInstantTime = null;
+ if (FSUtils.isBaseFile(fullFilePathToRollback)) {
+ HoodieBaseFile baseFileToDelete = new
HoodieBaseFile(fullFilePathToRollback.toString());
+ fileId = baseFileToDelete.getFileId();
+ baseInstantTime = baseFileToDelete.getCommitTime();
+ } else if (FSUtils.isLogFile(fullFilePathToRollback)) {
+ // TODO: HUDI-1517 may distinguish log file created from log
file being appended in the future @guanziyue
+ // Now it should not have create type
+ checkArgument(type != IOType.CREATE, "Log file should not
support create io type now");
+ checkArgument(type != IOType.MERGE, "Log file should not support
merge io type");
+ HoodieLogFile logFileToDelete = new
HoodieLogFile(fullFilePathToRollback.toString());
+ fileId = logFileToDelete.getFileId();
+ baseInstantTime = logFileToDelete.getBaseCommitTime();
+ }
+ Objects.requireNonNull(fileId, "Cannot find valid fileId from
path: " + fullFilePathToRollback);
+ Objects.requireNonNull(baseInstantTime, "Cannot find valid base
instant from path: " + fullFilePathToRollback);
+ return new HoodieRollbackRequest(partitionPath, fileId,
baseInstantTime,
+ Collections.singletonList(fullFilePathToRollback.toString()),
Collections.emptyMap());
case APPEND:
- // NOTE: This marker file-path does NOT correspond to a log-file,
but rather is a phony
- // path serving as a "container" for the following
components:
- // - Base file's file-id
- // - Base file's commit instant
- // - Partition path
- return getRollbackRequestForAppend(instantToRollback,
WriteMarkers.stripMarkerSuffix(markerFilePath));
+ HoodieRollbackRequest rollbackRequestForAppend =
getRollbackRequestForAppend(instantToRollback, fileNameWithPartitionToRollback);
+ return rollbackRequestForAppend;
default:
throw new HoodieRollbackException("Unknown marker type, during
rollback of " + instantToRollback);
}
}, parallelism);
+ // we need to ensure we return one rollback request per partition path,
fileId, baseInstant triplet.
+ List<HoodieRollbackRequest> processedRollbackRequests =
context.parallelize(rollbackRequests)
+ .mapToPair(rollbackRequest ->
Pair.of(Pair.of(rollbackRequest.getPartitionPath(),
rollbackRequest.getFileId()), rollbackRequest))
+ .reduceByKey((SerializableBiFunction<HoodieRollbackRequest,
HoodieRollbackRequest, HoodieRollbackRequest>) (hoodieRollbackRequest,
hoodieRollbackRequest2)
+ -> RollbackUtils.mergeRollbackRequest(hoodieRollbackRequest,
hoodieRollbackRequest2), parallelism).values().collectAsList();
+
+ return processedRollbackRequests;
} catch (Exception e) {
throw new HoodieRollbackException("Error rolling back using marker files
written for " + instantToRollback, e);
}
}
- protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant
instantToRollback, String markerFilePath) throws IOException {
- Path baseFilePathForAppend = new Path(basePath, markerFilePath);
- String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
- String baseCommitTime =
FSUtils.getCommitTime(baseFilePathForAppend.getName());
- String relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), baseFilePathForAppend.getParent());
- Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(),
relativePartitionPath);
-
- // NOTE: Since we're rolling back incomplete Delta Commit, it only could
have appended its
- // block to the latest log-file
- // TODO(HUDI-1517) use provided marker-file's path instead
- Option<HoodieLogFile> latestLogFileOption =
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
- HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
-
- // Log file can be deleted if the commit to rollback is also the commit
that created the fileGroup
- if (latestLogFileOption.isPresent() &&
baseCommitTime.equals(instantToRollback.getTimestamp())) {
- Path fullDeletePath = new Path(partitionPath,
latestLogFileOption.get().getFileName());
- return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING,
EMPTY_STRING,
- Collections.singletonList(fullDeletePath.toString()),
- Collections.emptyMap());
- }
-
- Map<String, Long> logFilesWithBlocsToRollback = new HashMap<>();
- if (latestLogFileOption.isPresent()) {
- HoodieLogFile latestLogFile = latestLogFileOption.get();
+ protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant
instantToRollback, String fileNameWithPartitionToRollback) throws IOException {
+ Path filePath = new Path(basePath, fileNameWithPartitionToRollback);
+ String fileId;
+ String baseCommitTime;
+ String relativePartitionPath;
+ Option<HoodieLogFile> latestLogFileOption;
+
+ // Old marker files may be generated from base file name before HUDI-1517.
keep compatible with them.
+ // TODO: deprecated in HUDI-1517, may be removed in the future.
@guanziyue.gzy
+
+ Map<String, Long> logFilesWithBlocksToRollback = new HashMap<>();
+ if (FSUtils.isBaseFile(filePath)) {
+ LOG.warn("Find old marker type for log file: " +
fileNameWithPartitionToRollback);
+ fileId = FSUtils.getFileIdFromFilePath(filePath);
+ baseCommitTime = FSUtils.getCommitTime(filePath.getName());
+ relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), filePath.getParent());
+ Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(),
relativePartitionPath);
+
+ // NOTE: Since we're rolling back incomplete Delta Commit, it only could
have appended its
+ // block to the latest log-file
+ try {
+ latestLogFileOption =
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
+ HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
+ if (latestLogFileOption.isPresent() &&
baseCommitTime.equals(instantToRollback.getTimestamp())) {
+ HoodieLogFile latestLogFile = latestLogFileOption.get();
+ // NOTE: Marker's don't carry information about the cumulative size
of the blocks that have been appended,
+ // therefore we simply stub this value.
+ FileSystem fileSystem = table.getMetaClient().getFs();
+ List<Option<FileStatus>> fileStatuses =
FSUtils.getFileStatusesUnderPartition(fileSystem, filePath.getParent(),
Collections.singletonList(filePath.getName()), true);
Review Comment:
Is it better to use getFileStatus here as we only have one file?
FSUtils.getFileStatusesUnderPartition this method is improve the performance
when we are trying to get more than one file under a directory.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -276,4 +283,27 @@ protected static Option<IndexedRecord>
toAvroRecord(HoodieRecord record, Schema
return Option.empty();
}
}
+
+ protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback
{
+ // we will create marker file for each log file and include all log files
to commit message
+ // so that all log files generated in this job will be included in commit
metadata. This is important for MDT.
+ // Assuming if spark task failed, the log file generated by it will not
appear in WriteStatus
+
+ @Override
+ public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+ return createAppendMarker(logFileToAppend);
+ }
+
+ @Override
+ public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+ // TODO: HUDI-1517 may distinguish log file created from log file being
appended in the future @guanziyue
+ return createAppendMarker(logFileToCreate);
+ }
+
+ private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ return writeMarkers.create(partitionPath, logFileToAppend.getFileName(),
IOType.APPEND,
Review Comment:
Fix this
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -79,14 +93,28 @@ public List<HoodieRollbackStat>
performRollback(HoodieEngineContext context, Hoo
// stack trace:
https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
// related stack overflow post:
https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as
GenericData.Array.
List<SerializableHoodieRollbackRequest> serializableRequests =
rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
- return context.reduceByKey(maybeDeleteAndCollectStats(context,
instantToRollback, serializableRequests, true, parallelism),
- RollbackUtils::mergeRollbackStat, parallelism);
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+
+ // Considering rollback may failed before, which generated some additional
log files. We need to add these log files back.
+ Set<String> logPaths = new HashSet<>();
+ try {
+ logPaths = markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism());
Review Comment:
hi codope, we do need a list op here to find all log files we may generated
before. This will return in a quick time when we don't have rollback failed
before.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -59,18 +71,20 @@ public class BaseRollbackHelper implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(BaseRollbackHelper.class);
protected static final String EMPTY_STRING = "";
+ protected final HoodieTable table;
protected final HoodieTableMetaClient metaClient;
protected final HoodieWriteConfig config;
- public BaseRollbackHelper(HoodieTableMetaClient metaClient,
HoodieWriteConfig config) {
- this.metaClient = metaClient;
+ public BaseRollbackHelper(HoodieTable table, HoodieWriteConfig config) {
+ this.table = table;
+ this.metaClient = table.getMetaClient();
this.config = config;
}
/**
* Performs all rollback actions that we have collected in parallel.
*/
- public List<HoodieRollbackStat> performRollback(HoodieEngineContext context,
HoodieInstant instantToRollback,
+ public List<HoodieRollbackStat> performRollback(HoodieEngineContext context,
String instantTime, HoodieInstant instantToRollback,
Review Comment:
Are we going to generate markers in rollback instantTime rather than delta
commit instant time? Just curious about the reason. Is it because we can have a
better performance here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]