nsivabalan commented on a change in pull request #2421:
URL: https://github.com/apache/hudi/pull/2421#discussion_r554055131
##########
File path:
hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
##########
@@ -181,10 +187,113 @@ public void testRestoreInstants() throws Exception {
// verify modified partitions included cleaned data
List<String> partitions =
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1",
10));
- assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"}));
+ assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));
partitions =
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1",
"4"));
- assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
+ assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
+ }
+
+ @Test
+ public void testHoodieRestoreMetadataSerDeser() throws IOException {
Review comment:
Check this test locally to reproduce the ser and deser issue.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -164,6 +164,7 @@ private void init(HoodieRecord record) {
// Since the actual log file written to can be different based on when
rollover happens, we use the
// base file to denote some log appends happened on a slice.
writeToken will still fence concurrent
// writers.
+ // TODO? are these sufficient?
Review comment:
created a follow up ticket
https://issues.apache.org/jira/browse/HUDI-1517. not fixing in this release
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,14 +116,31 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
+ // collect all log files that is supposed to be deleted with this
rollback
+ String baseCommit = rollbackRequest.getLatestBaseInstant().get();
Review comment:
Existing FSUtils.getAllLogFiles() expects fileId to be passed in. Hence
have created a new method in FSUtils which lists all files disregarding fileId,
but later filters for basecommit passed in.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
##########
@@ -794,7 +788,19 @@ private void validateMetadata(SparkRDDWriteClient client)
throws IOException {
if ((fsFileNames.size() != metadataFilenames.size()) ||
(!fsFileNames.equals(metadataFilenames))) {
LOG.info("*** File system listing = " +
Arrays.toString(fsFileNames.toArray()));
LOG.info("*** Metadata listing = " +
Arrays.toString(metadataFilenames.toArray()));
+
+ for (String fileName : fsFileNames) {
+ if (!metadataFilenames.contains(fileName)) {
+ LOG.error(partition + "FsFilename " + fileName + " not found in
Meta data");
Review comment:
its intentional. Instead of printing entire list across both actual and
expected, this will print out just the diff.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -225,25 +216,23 @@
/**
* Extracts information about the deleted and append files from the {@code
HoodieRollbackMetadata}.
*
- * During a rollback files may be deleted (COW, MOR) or rollback blocks be
appended (MOR only) to files. This
- * function will extract this change file for each partition.
+ * During a rollback files may be deleted (COW, MOR) or rollback blocks be
appended (MOR only) to files. This function will extract this change file for
each partition.
*
* @param rollbackMetadata {@code HoodieRollbackMetadata}
* @param partitionToDeletedFiles The {@code Map} to fill with files deleted
per partition.
* @param partitionToAppendedFiles The {@code Map} to fill with files
appended per partition and their sizes.
*/
private static void processRollbackMetadata(HoodieRollbackMetadata
rollbackMetadata,
- Map<String, List<String>>
partitionToDeletedFiles,
- Map<String, Map<String, Long>>
partitionToAppendedFiles,
- Option<String> lastSyncTs) {
+ Map<String, List<String>> partitionToDeletedFiles,
+ Map<String, Map<String, Long>> partitionToAppendedFiles,
+ Option<String> lastSyncTs) {
rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
// Has this rollback produced new files?
- boolean hasAppendFiles =
pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
+ boolean hasAppendFiles = pm.getRollbackLogFiles() != null ?
pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0
: false;
Review comment:
this is part of avro deser. I tried something to return empty, but
didn't work.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
##########
@@ -102,8 +112,10 @@ public void testMergeOnReadRollback() throws Exception {
.withMarkerFile("partA", f2, IOType.APPEND)
.withMarkerFile("partB", f4, IOType.APPEND);
+ HoodieWriteConfig writeConfig = getConfig();
// when
- List<HoodieRollbackStat> stats = new
SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context,
metaClient), context, getConfig(), "002")
+ HoodieTable morTable = HoodieSparkTable.create(writeConfig, context,
metaClient);
+ List<HoodieRollbackStat> stats = new
SparkMarkerBasedRollbackStrategy(morTable, context, writeConfig, "002")
Review comment:
This test is failing for now.
```
java.lang.IllegalArgumentException: Wrong FS:
file://localhost:53936/user/sivabala/partA, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649)
at
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
at
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:427)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
at
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$19(HoodieWrapperFileSystem.java:580)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:100)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:579)
at org.apache.hudi.common.fs.FSUtils.getAllLogFiles(FSUtils.java:428)
at
org.apache.hudi.table.action.rollback.SparkMarkerBasedRollbackStrategy.getWrittenLogFileSizeMap(SparkMarkerBasedRollbackStrategy.java:90)
at
org.apache.hudi.table.action.rollback.AbstractMarkerBasedRollbackStrategy.undoAppend(AbstractMarkerBasedRollbackStrategy.java:93)
at
org.apache.hudi.table.action.rollback.SparkMarkerBasedRollbackStrategy.lambda$execute$d4509179$1(SparkMarkerBasedRollbackStrategy.java:73)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]