vinothchandar commented on a change in pull request #2421:
URL: https://github.com/apache/hudi/pull/2421#discussion_r554648625
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
##########
@@ -90,6 +90,8 @@ protected HoodieRollbackStat undoAppend(String
appendBaseFilePath, HoodieInstant
String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
String baseCommitTime =
FSUtils.getCommitTime(baseFilePathForAppend.getName());
String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
+ final Map<FileStatus, Long> writtenLogFileSizeMap =
config.useFileListingMetadata()
Review comment:
Hmmm. can we remove this flag as we discussed before? using the file
listing flag here is very confusing and crosses a lot of layers. if you are
worried about the new code breaking, then a catch block returning an empty map
is good? Downside is it will make it hard to detect issues here.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,6 +119,15 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
+ // collect all log files that is supposed to be deleted with this
rollback
+ Map<FileStatus, Long> writtenLogFileSizeMap = Collections.EMPTY_MAP;
+ if (config.useFileListingMetadata()) {
Review comment:
same here. lets not reference this config here.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,6 +119,15 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
+ // collect all log files that is supposed to be deleted with this
rollback
+ Map<FileStatus, Long> writtenLogFileSizeMap = Collections.EMPTY_MAP;
+ if (config.useFileListingMetadata()) {
+ Path partitionPath =
FSUtils.getPartitionPath(config.getBasePath(),
rollbackRequest.getPartitionPath());
+ writtenLogFileSizeMap = metaClient.getFs().exists(partitionPath) ?
FSUtils.getAllLogFiles(metaClient.getFs(), partitionPath,
Review comment:
what does `FSUtils.getAllLogFiles` return if the partition does not
exist? can we avoid the .exists() call
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -239,7 +240,8 @@ private static void
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
// Has this rollback produced new files?
- boolean hasAppendFiles =
pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
+ boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null &&
!pm.getRollbackLogFiles().isEmpty();
+ boolean hasAppendFiles = hasRollbackLogFiles ?
pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0
: false;
Review comment:
should we rename `hasAppendFiles` -> `hasNonZeroRollbackLogFiles`?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
##########
@@ -75,4 +86,12 @@ public SparkMarkerBasedRollbackStrategy(HoodieTable<T,
JavaRDD<HoodieRecord<T>>,
throw new HoodieRollbackException("Error rolling back using marker files
written for " + instantToRollback, e);
}
}
+
+ protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String
partitionPathStr, String baseCommitTime, String fileId) throws IOException {
+ // collect all log files that is supposed to be deleted with this rollback
+ Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(),
partitionPathStr);
+ return table.getMetaClient().getFs().exists(partitionPath) ?
FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
Review comment:
same here.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
##########
@@ -32,16 +35,24 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import java.io.IOException;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import scala.Tuple2;
@SuppressWarnings("checkstyle:LineLength")
public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload>
extends AbstractMarkerBasedRollbackStrategy<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
- public SparkMarkerBasedRollbackStrategy(HoodieTable<T,
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
+
+ public SparkMarkerBasedRollbackStrategy(HoodieTable<T,
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
HoodieEngineContext context, HoodieWriteConfig config,
Review comment:
restore formatting
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -262,18 +264,31 @@ private static void
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
partitionToDeletedFiles.get(partition).addAll(deletedFiles);
}
- if (!pm.getAppendFiles().isEmpty()) {
+ if (pm.getRollbackLogFiles() != null &&
!pm.getRollbackLogFiles().isEmpty()) {
Review comment:
is nt this condition same as `hasRollbackLogFiles` ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -262,18 +264,31 @@ private static void
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
partitionToDeletedFiles.get(partition).addAll(deletedFiles);
}
- if (!pm.getAppendFiles().isEmpty()) {
+ if (pm.getRollbackLogFiles() != null &&
!pm.getRollbackLogFiles().isEmpty()) {
if (!partitionToAppendedFiles.containsKey(partition)) {
partitionToAppendedFiles.put(partition, new HashMap<>());
}
// Extract appended file name from the absolute paths saved in
getAppendFiles()
- pm.getAppendFiles().forEach((path, size) -> {
+ pm.getRollbackLogFiles().forEach((path, size) -> {
partitionToAppendedFiles.get(partition).merge(new
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
return size + oldSize;
});
});
}
+
+ if (pm.getWrittenLogFiles() != null &&
!pm.getWrittenLogFiles().isEmpty()) {
+ if (!partitionToAppendedFiles.containsKey(partition)) {
+ partitionToAppendedFiles.put(partition, new HashMap<>());
+ }
+
+ // Extract appended file name from the absolute paths saved in
getWrittenLogFiles()
+ pm.getWrittenLogFiles().forEach((path, size) -> {
+ partitionToAppendedFiles.get(partition).merge(new
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
+ return oldSize;
Review comment:
I think we should fix it to take the higher value, uniformly. and avoid
all these caveats. Code will be more maintainable
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -262,13 +251,26 @@ private static void
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
partitionToDeletedFiles.get(partition).addAll(deletedFiles);
}
- if (!pm.getAppendFiles().isEmpty()) {
+ if (pm.getRollbackLogFiles() != null &&
!pm.getRollbackLogFiles().isEmpty()) {
if (!partitionToAppendedFiles.containsKey(partition)) {
partitionToAppendedFiles.put(partition, new HashMap<>());
}
// Extract appended file name from the absolute paths saved in
getAppendFiles()
- pm.getAppendFiles().forEach((path, size) -> {
+ pm.getRollbackLogFiles().forEach((path, size) -> {
+ partitionToAppendedFiles.get(partition).merge(new
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
+ return size + oldSize;
Review comment:
still, should we pick the higher value always?
##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -430,6 +430,17 @@ public static boolean isLogFile(Path logPath) {
.map(HoodieLogFile::new).filter(s ->
s.getBaseCommitTime().equals(baseCommitTime));
}
+ /**
+ * Get all the log files for the passed in FileId in the partition path.
Review comment:
fix the docs?
##########
File path: hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
##########
@@ -31,18 +31,22 @@
{"name": "partitionPath", "type": "string"},
{"name": "successDeleteFiles", "type": {"type": "array", "items":
"string"}},
{"name": "failedDeleteFiles", "type": {"type": "array", "items":
"string"}},
- {"name": "appendFiles", "type": {
+ {"name": "rollbackLogFiles", "type": {
"type": "map",
- "doc": "Files to which append blocks were written",
Review comment:
why remove the docs? in fact, we should add to both?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
##########
@@ -40,88 +48,103 @@
public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
+ private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with
listing metadata enable={0}";
+
+ public static Stream<Arguments> configParams() {
+ return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
+ }
+
+ private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+
@BeforeEach
public void setUp() throws Exception {
initPath();
initSparkContexts();
initFileSystem();
- initMetaClient();
- initDFS();
+ initMetaClient(tableType);
+ initTestDataGenerator();
}
@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}
- @Test
- public void testCopyOnWriteRollback() throws Exception {
- // given: wrote some base files and corresponding markers
- HoodieTestTable testTable = HoodieTestTable.of(metaClient);
- String f0 = testTable.addRequestedCommit("000")
- .getFileIdsWithBaseFilesInPartitions("partA").get("partA");
- String f1 = testTable.addCommit("001")
- .withBaseFilesInPartition("partA", f0)
- .getFileIdsWithBaseFilesInPartitions("partB").get("partB");
- String f2 = "f2";
- testTable.forCommit("001")
- .withMarkerFile("partA", f0, IOType.MERGE)
- .withMarkerFile("partB", f1, IOType.CREATE)
- .withMarkerFile("partA", f2, IOType.CREATE);
-
- // when
- List<HoodieRollbackStat> stats = new
SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context,
metaClient), context, getConfig(), "002")
- .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, "001"));
-
- // then: ensure files are deleted correctly, non-existent files reported
as failed deletes
- assertEquals(2, stats.size());
-
- FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA");
- FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB");
-
- assertEquals(0, partBFiles.length);
- assertEquals(1, partAFiles.length);
- assertEquals(2, stats.stream().mapToInt(r ->
r.getSuccessDeleteFiles().size()).sum());
- assertEquals(1, stats.stream().mapToInt(r ->
r.getFailedDeleteFiles().size()).sum());
+ @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
Review comment:
yes. but instead of replacing. can we make yours functional test? and
retain the existing test code?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,6 +119,15 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
+ // collect all log files that is supposed to be deleted with this
rollback
+ Map<FileStatus, Long> writtenLogFileSizeMap = Collections.EMPTY_MAP;
+ if (config.useFileListingMetadata()) {
+ Path partitionPath =
FSUtils.getPartitionPath(config.getBasePath(),
rollbackRequest.getPartitionPath());
+ writtenLogFileSizeMap = metaClient.getFs().exists(partitionPath) ?
FSUtils.getAllLogFiles(metaClient.getFs(), partitionPath,
Review comment:
why can we get the logfiles only for that fileID here? seems like you
are fetching all log files in that partition?
##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -430,6 +430,17 @@ public static boolean isLogFile(Path logPath) {
.map(HoodieLogFile::new).filter(s ->
s.getBaseCommitTime().equals(baseCommitTime));
}
+ /**
+ * Get all the log files for the passed in FileId in the partition path.
Review comment:
I wonder why we need this method?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]