nsivabalan commented on a change in pull request #3651:
URL: https://github.com/apache/hudi/pull/3651#discussion_r709251029
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
##########
@@ -65,83 +63,32 @@ public AbstractMarkerBasedRollbackStrategy(HoodieTable<T,
I, K, O> table, Hoodie
this.instantTime = instantTime;
}
- protected HoodieRollbackStat undoMerge(String mergedBaseFilePath) throws
IOException {
- LOG.info("Rolling back by deleting the merged base file:" +
mergedBaseFilePath);
- return deleteBaseFile(mergedBaseFilePath);
- }
-
- protected HoodieRollbackStat undoCreate(String createdBaseFilePath) throws
IOException {
- LOG.info("Rolling back by deleting the created base file:" +
createdBaseFilePath);
- return deleteBaseFile(createdBaseFilePath);
- }
-
- private HoodieRollbackStat deleteBaseFile(String baseFilePath) throws
IOException {
- Path fullDeletePath = new Path(basePath, baseFilePath);
- String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullDeletePath.getParent());
- boolean isDeleted = table.getMetaClient().getFs().delete(fullDeletePath);
- return HoodieRollbackStat.newBuilder()
- .withPartitionPath(partitionPath)
- .withDeletedFileResult(baseFilePath, isDeleted)
- .build();
- }
-
- protected HoodieRollbackStat undoAppend(String appendBaseFilePath,
HoodieInstant instantToRollback) throws IOException, InterruptedException {
+ protected HoodieRollbackRequest getRollbackRequestForAppend(String
appendBaseFilePath) throws IOException {
Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath);
String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
String baseCommitTime =
FSUtils.getCommitTime(baseFilePathForAppend.getName());
String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
- final Map<FileStatus, Long> writtenLogFileSizeMap =
getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
-
- HoodieLogFormat.Writer writer = null;
- try {
- Path partitionFullPath = FSUtils.getPartitionPath(basePath,
partitionPath);
-
- if (!table.getMetaClient().getFs().exists(partitionFullPath)) {
- return HoodieRollbackStat.newBuilder()
- .withPartitionPath(partitionPath)
- .build();
- }
- writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(partitionFullPath)
- .withFileId(fileId)
- .overBaseCommit(baseCommitTime)
- .withFs(table.getMetaClient().getFs())
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
- // generate metadata
- Map<HoodieLogBlock.HeaderMetadataType, String> header =
RollbackUtils.generateHeader(instantToRollback.getTimestamp(), instantTime);
- // if update belongs to an existing log file
- writer.appendBlock(new HoodieCommandBlock(header));
- } finally {
- try {
- if (writer != null) {
- writer.close();
- }
- } catch (IOException io) {
- throw new HoodieIOException("Error closing append of rollback
block..", io);
- }
+ Map<FileStatus, Long> writtenLogFileSizeMap =
getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
+ Map<String, Long> writtenLogFileStrSizeMap = new HashMap<>();
+ for (Map.Entry<FileStatus, Long> entry : writtenLogFileSizeMap.entrySet())
{
+ writtenLogFileStrSizeMap.put(entry.getKey().getPath().toString(),
entry.getValue());
}
-
- // the information of files appended to is required for metadata sync
- Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
-
table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
- 1L);
-
- return HoodieRollbackStat.newBuilder()
- .withPartitionPath(partitionPath)
- .withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build();
+ return new HoodieRollbackRequest(partitionPath, fileId, baseCommitTime,
Collections.emptyList(), writtenLogFileStrSizeMap);
}
/**
* Returns written log file size map for the respective baseCommitTime to
assist in metadata table syncing.
- * @param partitionPath partition path of interest
+ *
+ * @param partitionPath partition path of interest
* @param baseCommitTime base commit time of interest
- * @param fileId fileId of interest
+ * @param fileId fileId of interest
* @return Map<FileStatus, File size>
* @throws IOException
*/
protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String
partitionPath, String baseCommitTime, String fileId) throws IOException {
- return Collections.EMPTY_MAP;
+ // collect all log files that is supposed to be deleted with this rollback
+ return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
Review comment:
I don't fully get this point. I have created a follow up ticket
[here](https://issues.apache.org/jira/browse/HUDI-2436). will follow up with
you.
here is my understanding: of a scenario using cloud stores that does not
support append.
If there was crash during a commit, when listing log files to be logged, the
last one which got crashed may not be part of the rollback plan. but thats
should be fine. anyways, its not available via listing. and so I assume even
during compaction those will not be available. we will proceed on with rollback
by adding another log block (file). and this will get replayed to metadata
table.
If you are talking about the case, where a crash happens when rollback
itself is being logged and crashed just before committing to metadata table.
we should be ok here too. we will retry the rollback which will redo the
action phase. and will add new log blocks (with same old logs that were part of
failed writes, just that it may not be able to successfully delete). and this
will get applied to metadata table. We just have to ensure when applying
changes to metadata table, we consider all files from the plan and not just the
ones that got successfully deleted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]