vinothchandar commented on a change in pull request #2421:
URL: https://github.com/apache/hudi/pull/2421#discussion_r554079488
##########
File path:
hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
##########
@@ -181,10 +187,113 @@ public void testRestoreInstants() throws Exception {
// verify modified partitions included cleaned data
List<String> partitions =
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1",
10));
- assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"}));
+ assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));
partitions =
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1",
"4"));
- assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
+ assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
+ }
+
+ @Test
+ public void testHoodieRestoreMetadataSerDeser() throws IOException {
+
+ String partitionPath1 = "/partitionPath1/";
+ String partitionPath2 = "/partitionPath2/";
+ String partitionPath3 = "/partitionPath3/";
+ //prepare HoodieRollbackStat for different partition
+ Map<FileStatus, Boolean> dataFilesOnlyStat1Files = new HashMap<>();
+ dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 +
"dataFile1.parquet"), true);
+ dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 +
"dataFile2.parquet"), true);
+ HoodieRollbackStat dataFilesOnlyStat1 = HoodieRollbackStat.newBuilder()
+ .withPartitionPath(partitionPath1)
+ .withDeletedFileResults(dataFilesOnlyStat1Files).build();
+
+ Map<FileStatus, Long> dataFilesOnlyStat2Files = new HashMap<>();
+ dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 +
"dataFile1.parquet"), 5L);
+ dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 +
"dataFile2.parquet"), 20L);
+ HoodieRollbackStat dataFilesOnlyStat2 = HoodieRollbackStat.newBuilder()
+ .withPartitionPath(partitionPath2)
+ .withRollbackBlockAppendResults(dataFilesOnlyStat2Files).build();
+
+ Map<FileStatus, Long> dataFilesOnlyStat3Files = new HashMap<>();
+ dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath2 +
"dataFile1.parquet"), 100L);
+ dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath2 +
"dataFile2.parquet"), 200000L);
+ HoodieRollbackStat dataFilesOnlyStat3 = HoodieRollbackStat.newBuilder()
+ .withPartitionPath(partitionPath3)
+ .withProbableLogFileToSizeMap(dataFilesOnlyStat3Files).build();
+
+ List<HoodieRollbackStat> rollbackStats = new ArrayList<>();
+ rollbackStats.add(dataFilesOnlyStat1);
+ rollbackStats.add(dataFilesOnlyStat2);
+ rollbackStats.add(dataFilesOnlyStat3);
+
+ List<HoodieInstant> instants = new ArrayList<>();
+ for (int i = 1; i <= 5; i++) {
+ String ts = i + "";
+ HoodieInstant instant = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, ts);
+ instants.add(instant);
+ }
+
+ HoodieRollbackMetadata rollbackMetadata =
TimelineMetadataUtils.convertRollbackMetadata("001", Option.of(1234L),
+ instants, rollbackStats);
+
+ Option<byte[]> ser =
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata);
+ if (ser.isPresent()) {
+ HoodieRollbackMetadata rollbackMetadata1 =
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(ser.get());
+ System.out.println("deser successfully ");
+ for (Entry<String, HoodieRollbackPartitionMetadata> rollbackStat :
rollbackMetadata1.getPartitionMetadata().entrySet()) {
+ System.out.println("Deser value fro " + rollbackStat.getKey());
+ HoodieRollbackPartitionMetadata stat = rollbackStat.getValue();
+ System.out.println(
+ "1111 Rollback stat for partition " + stat.getPartitionPath() + "
success files " + (stat.getSuccessDeleteFiles() != null ? Arrays
+ .toString(stat.getSuccessDeleteFiles().toArray(new String[0]))
+ : "null")
+ + ", delete files :: " + (stat.getSuccessDeleteFiles() != null
? Arrays.toString(stat.getFailedDeleteFiles().toArray(new String[0])) : "null")
+ + ", rollback log files " + stat
+ .getRollbackLogFiles().entrySet()
+ + ", probable/written log files : " +
stat.getWrittenLogFiles().entrySet());
+ }
+ }
+
+
System.out.println("---------------------------------------------------------------------------");
+
System.out.println("---------------------------------------------------------------------------");
+
System.out.println("---------------------------------------------------------------------------");
+
+ Map<String, List<HoodieRollbackMetadata>> instantToRollbackMetadata = new
HashMap<>();
+ instantToRollbackMetadata.put("abc",
Collections.singletonList(rollbackMetadata));
+
+ HoodieRestoreMetadata restoreMetadata =
TimelineMetadataUtils.convertRestoreMetadata("002", 10L,
+ instants, instantToRollbackMetadata);
+ ser = TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata);
+ if (ser.isPresent()) {
+ HoodieRestoreMetadata restoreMetadata1 =
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(ser.get());
+ System.out.println("deser successfully ");
+
+ for (Entry<String, List<HoodieRollbackMetadata>> entry :
restoreMetadata1.getHoodieRestoreMetadata().entrySet()) {
+ List<HoodieRollbackMetadata> rollbackMetadatas = entry.getValue();
+ for (HoodieRollbackMetadata rollbackMetadata2 : rollbackMetadatas) {
+ for (Entry<String, HoodieRollbackPartitionMetadata> rollbackStat :
rollbackMetadata2.getPartitionMetadata().entrySet()) {
+ System.out.println("Deser value fro " + rollbackStat.getKey());
+ HoodieRollbackPartitionMetadata stat = rollbackStat.getValue();
+ System.out.println(
Review comment:

Can you try setting a breakpoint here and see if these fields actually get
written into the outputstream.
My guess is they are. but the schema used for reading is different. Make
sure you recompile the entire project from command line once. `mvn clean
install ....` before running the test via IDE
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -164,6 +164,7 @@ private void init(HoodieRecord record) {
// Since the actual log file written to can be different based on when
rollover happens, we use the
// base file to denote some log appends happened on a slice.
writeToken will still fence concurrent
// writers.
+ // TODO? are these sufficient?
Review comment:
lets answer this and either resolve and fix
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
##########
@@ -129,9 +131,23 @@ protected HoodieRollbackStat undoAppend(String
appendBaseFilePath, HoodieInstant
1L);
}
- return HoodieRollbackStat.newBuilder()
+ HoodieRollbackStat.Builder builder = HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath)
- .withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .build();
+ .withRollbackBlockAppendResults(filesToNumBlocksRollback);
+ if (probableLogFileMap != null) {
+ builder.withProbableLogFileToSizeMap(probableLogFileMap);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Returns probable log files for the respective baseCommitTime to assist in
metadata table syncing.
+ * @param partitionPath partition path of interest
+ * @param baseCommitTime base commit time of interest
+ * @return Map<FileStatus, File size>
+ * @throws IOException
+ */
+ protected Map<FileStatus, Long> getProbableFileSizeMap(String partitionPath,
String baseCommitTime) throws IOException {
Review comment:
why probable? is there a chance that this can be wrong?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
##########
@@ -129,9 +130,23 @@ protected HoodieRollbackStat undoAppend(String
appendBaseFilePath, HoodieInstant
1L);
}
- return HoodieRollbackStat.newBuilder()
+ HoodieRollbackStat.Builder builder = HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath)
- .withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .build();
+ .withRollbackBlockAppendResults(filesToNumBlocksRollback);
+ if (probableLogFileMap != null) {
+ builder.withProbableLogFileToSizeMap(probableLogFileMap);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Returns probable log files for the respective baseCommitTime to assist in
metadata table syncing.
+ * @param partitionPath partition path of interest
+ * @param baseCommitTime base commit time of interest
+ * @return Map<FileStatus, File size>
+ * @throws IOException
+ */
+ protected Map<FileStatus, Long> getProbableFileSizeMap(String partitionPath,
String baseCommitTime) throws IOException {
+ return null;
Review comment:
Collections.EMPTY_MAP
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
##########
@@ -794,7 +788,19 @@ private void validateMetadata(SparkRDDWriteClient client)
throws IOException {
if ((fsFileNames.size() != metadataFilenames.size()) ||
(!fsFileNames.equals(metadataFilenames))) {
LOG.info("*** File system listing = " +
Arrays.toString(fsFileNames.toArray()));
LOG.info("*** Metadata listing = " +
Arrays.toString(metadataFilenames.toArray()));
+
+ for (String fileName : fsFileNames) {
+ if (!metadataFilenames.contains(fileName)) {
+ LOG.error(partition + "FsFilename " + fileName + " not found in
Meta data");
Review comment:
debug code?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -225,25 +216,23 @@
/**
* Extracts information about the deleted and append files from the {@code
HoodieRollbackMetadata}.
*
- * During a rollback files may be deleted (COW, MOR) or rollback blocks be
appended (MOR only) to files. This
- * function will extract this change file for each partition.
+ * During a rollback files may be deleted (COW, MOR) or rollback blocks be
appended (MOR only) to files. This function will extract this change file for
each partition.
*
* @param rollbackMetadata {@code HoodieRollbackMetadata}
* @param partitionToDeletedFiles The {@code Map} to fill with files deleted
per partition.
* @param partitionToAppendedFiles The {@code Map} to fill with files
appended per partition and their sizes.
*/
private static void processRollbackMetadata(HoodieRollbackMetadata
rollbackMetadata,
- Map<String, List<String>>
partitionToDeletedFiles,
- Map<String, Map<String, Long>>
partitionToAppendedFiles,
- Option<String> lastSyncTs) {
+ Map<String, List<String>> partitionToDeletedFiles,
+ Map<String, Map<String, Long>> partitionToAppendedFiles,
+ Option<String> lastSyncTs) {
rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
// Has this rollback produced new files?
- boolean hasAppendFiles =
pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
+ boolean hasAppendFiles = pm.getRollbackLogFiles() != null ?
pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0
: false;
Review comment:
lets have one boolean for each. and can we deal with empty maps instead
of nulls.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -98,7 +98,7 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
* @return stats collected with or w/o actual deletions.
*/
JavaPairRDD<String, HoodieRollbackStat>
maybeDeleteAndCollectStats(HoodieEngineContext context, HoodieInstant
instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests,
- int
sparkPartitions, boolean doDelete) {
+ int sparkPartitions, boolean doDelete) {
Review comment:
revert this?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
##########
@@ -129,9 +130,23 @@ protected HoodieRollbackStat undoAppend(String
appendBaseFilePath, HoodieInstant
1L);
}
- return HoodieRollbackStat.newBuilder()
+ HoodieRollbackStat.Builder builder = HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath)
- .withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .build();
+ .withRollbackBlockAppendResults(filesToNumBlocksRollback);
+ if (probableLogFileMap != null) {
Review comment:
we should return an empty map from the method, instead of relying on
null.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
##########
@@ -75,4 +84,23 @@ public SparkMarkerBasedRollbackStrategy(HoodieTable<T,
JavaRDD<HoodieRecord<T>>,
throw new HoodieRollbackException("Error rolling back using marker files
written for " + instantToRollback, e);
}
}
+
+ protected Map<FileStatus, Long> getProbableFileSizeMap(String partitionPath,
String baseCommitTime) throws IOException {
+ // collect all log files that is supposed to be deleted with this rollback
Review comment:
this is standard code already available. Please see the other comment
##########
File path: hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
##########
@@ -31,18 +31,22 @@
{"name": "partitionPath", "type": "string"},
{"name": "successDeleteFiles", "type": {"type": "array", "items":
"string"}},
{"name": "failedDeleteFiles", "type": {"type": "array", "items":
"string"}},
- {"name": "appendFiles", "type": {
+ {"name": "rollbackLogFiles", "type": {
"type": "map",
- "doc": "Files to which append blocks were written",
"values": {
"type": "long",
"doc": "Size of this file in bytes"
}
+ }},
+ {"name": "writtenLogFiles", "type": {
Review comment:
why do we call this `writtenLogFiles` and elsewhere we just call this
`probable...`?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,14 +116,31 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
+ // collect all log files that is supposed to be deleted with this
rollback
+ String baseCommit = rollbackRequest.getLatestBaseInstant().get();
Review comment:
`baseInstantTime`
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -225,25 +217,23 @@
/**
* Extracts information about the deleted and append files from the {@code
HoodieRollbackMetadata}.
*
- * During a rollback files may be deleted (COW, MOR) or rollback blocks be
appended (MOR only) to files. This
- * function will extract this change file for each partition.
+ * During a rollback files may be deleted (COW, MOR) or rollback blocks be
appended (MOR only) to files. This function will extract this change file for
each partition.
*
* @param rollbackMetadata {@code HoodieRollbackMetadata}
* @param partitionToDeletedFiles The {@code Map} to fill with files deleted
per partition.
* @param partitionToAppendedFiles The {@code Map} to fill with files
appended per partition and their sizes.
*/
private static void processRollbackMetadata(HoodieRollbackMetadata
rollbackMetadata,
- Map<String, List<String>>
partitionToDeletedFiles,
- Map<String, Map<String, Long>>
partitionToAppendedFiles,
- Option<String> lastSyncTs) {
+ Map<String, List<String>> partitionToDeletedFiles,
Review comment:
can we revert these unncessary formatting changes?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,14 +117,31 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
+ // collect all log files that is supposed to be deleted with this
rollback
+ String baseCommit = rollbackRequest.getLatestBaseInstant().get();
+ SerializablePathFilter filter = (path) -> {
+ if (FSUtils.isLogFile(path)) {
+ // Since the baseCommitTime is the only commit for new log
files, it's okay here
+ String fileCommitTime =
FSUtils.getBaseCommitTimeFromLogPath(path);
+ return baseCommit.equals(fileCommitTime);
+ }
+ return false;
+ };
+
+ final Map<FileStatus, Long> probableLogFileMap = new HashMap<>();
+ FileSystem fs = metaClient.getFs();
+ FileStatus[] probableLogFiles =
fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(),
rollbackRequest.getPartitionPath()), filter);
Review comment:
Guess, there is no way to avoid this listing per se.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -102,8 +102,6 @@
/**
* Finds all new files/partitions created as part of commit and creates
metadata table records for them.
*
- * @param commitMetadata
Review comment:
why these changes?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
##########
@@ -70,18 +70,20 @@ public static HoodieRollbackMetadata
convertRollbackMetadata(String startRollbac
Map<String, HoodieRollbackPartitionMetadata> partitionMetadataBuilder =
new HashMap<>();
int totalDeleted = 0;
for (HoodieRollbackStat stat : rollbackStats) {
- Map<String, Long> appendFiles =
stat.getCommandBlocksCount().keySet().stream()
+ Map<String, Long> rollbackLogFiles =
stat.getCommandBlocksCount().keySet().stream()
+ .collect(Collectors.toMap(f -> f.getPath().toString(),
FileStatus::getLen));
+ Map<String, Long> probableLogFiles =
stat.getProbableLogFileToSizeMap().keySet().stream()
Review comment:
lets stick to one consistent term please
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -262,13 +251,26 @@ private static void
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
partitionToDeletedFiles.get(partition).addAll(deletedFiles);
}
- if (!pm.getAppendFiles().isEmpty()) {
+ if (pm.getRollbackLogFiles() != null &&
!pm.getRollbackLogFiles().isEmpty()) {
if (!partitionToAppendedFiles.containsKey(partition)) {
partitionToAppendedFiles.put(partition, new HashMap<>());
}
// Extract appended file name from the absolute paths saved in
getAppendFiles()
- pm.getAppendFiles().forEach((path, size) -> {
+ pm.getRollbackLogFiles().forEach((path, size) -> {
+ partitionToAppendedFiles.get(partition).merge(new
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
+ return size + oldSize;
Review comment:
this size needs to be updated right? is it correct to add this?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
##########
@@ -56,9 +56,9 @@
private static final Integer DEFAULT_VERSION = 1;
public static HoodieRestoreMetadata convertRestoreMetadata(String
startRestoreTime,
- long durationInMs,
-
List<HoodieInstant> instants,
- Map<String,
List<HoodieRollbackMetadata>> instantToRollbackMetadata) {
+ long durationInMs,
Review comment:
revert
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -164,6 +164,7 @@ private void init(HoodieRecord record) {
// Since the actual log file written to can be different based on when
rollover happens, we use the
// base file to denote some log appends happened on a slice.
writeToken will still fence concurrent
// writers.
+ // TODO? are these sufficient?
Review comment:
the cleanest way to have a marker per file created. That way we can
avoid listing anything at all. We can just compute the files from the marker
folder directly
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,14 +116,31 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
+ // collect all log files that is supposed to be deleted with this
rollback
+ String baseCommit = rollbackRequest.getLatestBaseInstant().get();
Review comment:
all of this code is pretty much replaced by FSUtils.getAllLogFiles()?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
##########
@@ -70,18 +70,20 @@ public static HoodieRollbackMetadata
convertRollbackMetadata(String startRollbac
Map<String, HoodieRollbackPartitionMetadata> partitionMetadataBuilder =
new HashMap<>();
int totalDeleted = 0;
for (HoodieRollbackStat stat : rollbackStats) {
- Map<String, Long> appendFiles =
stat.getCommandBlocksCount().keySet().stream()
+ Map<String, Long> rollbackLogFiles =
stat.getCommandBlocksCount().keySet().stream()
+ .collect(Collectors.toMap(f -> f.getPath().toString(),
FileStatus::getLen));
+ Map<String, Long> probableLogFiles =
stat.getProbableLogFileToSizeMap().keySet().stream()
Review comment:
I think you want to capture the fact that this is all the log files.
lets just call it `allLogFiles` then
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -164,6 +164,7 @@ private void init(HoodieRecord record) {
// Since the actual log file written to can be different based on when
rollover happens, we use the
// base file to denote some log appends happened on a slice.
writeToken will still fence concurrent
// writers.
+ // TODO? are these sufficient?
Review comment:
lets have a follow on JIRA for this. We should ideally fix so that
marker files exist for all log files. and we are able to just add the actual
writtenLogFiles ..
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]