This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4787076  HUDI-204 : Make MOR rollback idempotent and disable using 
rolling stats for small file selection (#833)
4787076 is described below

commit 4787076c6d87488c0087ac0cfb7fa86596b1357a
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Tue Aug 13 17:13:30 2019 -0700

    HUDI-204 : Make MOR rollback idempotent and disable using rolling stats for 
small file selection (#833)
---
 .../apache/hudi/table/HoodieMergeOnReadTable.java  | 92 +++++++---------------
 .../apache/hudi/table/TestMergeOnReadTable.java    | 25 +++++-
 2 files changed, 52 insertions(+), 65 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index ad0424b..e34a124 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -44,8 +44,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieRollingStat;
-import org.apache.hudi.common.model.HoodieRollingStatMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.table.SyncableFileSystemView;
@@ -60,7 +58,6 @@ import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCompactionException;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieUpsertException;
@@ -334,27 +331,6 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends
     super.finalizeWrite(jsc, instantTs, stats);
   }
 
-  @Override
-  protected HoodieRollingStatMetadata getRollingStats() {
-    try {
-      Option<HoodieInstant> lastInstant = 
this.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
-          .lastInstant();
-      if (lastInstant.isPresent()) {
-        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
-            
this.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), 
HoodieCommitMetadata.class);
-        Option<String> lastRollingStat = 
Option.ofNullable(commitMetadata.getExtraMetadata()
-            .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY));
-        if (lastRollingStat.isPresent()) {
-          return HoodieCommitMetadata
-              .fromBytes(lastRollingStat.get().getBytes(), 
HoodieRollingStatMetadata.class);
-        }
-      }
-      return null;
-    } catch (IOException e) {
-      throw new HoodieException();
-    }
-  }
-
   /**
    * UpsertPartitioner for MergeOnRead table type, this allows auto correction 
of small parquet
    * files to larger ones without the need for an index in the logFile.
@@ -438,18 +414,6 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends
     }
 
     private long getTotalFileSize(String partitionPath, FileSlice fileSlice) {
-      if (rollingStatMetadata != null) {
-        Map<String, HoodieRollingStat> partitionRollingStats =
-            
rollingStatMetadata.getPartitionToRollingStats().get(partitionPath);
-        if (partitionRollingStats != null) {
-          HoodieRollingStat rollingStatForFile = 
partitionRollingStats.get(fileSlice.getFileId());
-          if (rollingStatForFile != null) {
-            long inserts = rollingStatForFile.getInserts();
-            return averageRecordSize * inserts;
-          }
-        }
-      }
-      // In case Rolling Stats is not present, fall back to sizing log files 
based on heuristics
       if (!fileSlice.getDataFile().isPresent()) {
         return 
convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
       } else {
@@ -506,35 +470,37 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends
         }).forEach(wStat -> {
           Writer writer = null;
           String baseCommitTime = 
fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
-          boolean success = false;
-          try {
-            writer = HoodieLogFormat.newWriterBuilder().onParentPath(
-                FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), 
partitionPath))
-                .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
-                .withFs(this.metaClient.getFs())
-                .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-            // generate metadata
-            Map<HeaderMetadataType, String> header = generateHeader(commit);
-            // if update belongs to an existing log file
-            writer = writer.appendBlock(new HoodieCommandBlock(header));
-            success = true;
-          } catch (IOException | InterruptedException io) {
-            throw new HoodieRollbackException(
-                "Failed to rollback for commit " + commit, io);
-          } finally {
+          if (null != baseCommitTime) {
+            boolean success = false;
             try {
-              if (writer != null) {
-                writer.close();
-              }
-              if (success) {
-                // This step is intentionally done after writer is closed. 
Guarantees that
-                // getFileStatus would reflect correct stats and 
FileNotFoundException is not thrown in
-                // cloud-storage : HUDI-168
-                filesToNumBlocksRollback.put(this.getMetaClient().getFs()
-                    .getFileStatus(writer.getLogFile().getPath()), 1L);
+              writer = HoodieLogFormat.newWriterBuilder().onParentPath(
+                  FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), 
partitionPath))
+                  .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
+                  .withFs(this.metaClient.getFs())
+                  .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+              // generate metadata
+              Map<HeaderMetadataType, String> header = generateHeader(commit);
+              // if update belongs to an existing log file
+              writer = writer.appendBlock(new HoodieCommandBlock(header));
+              success = true;
+            } catch (IOException | InterruptedException io) {
+              throw new HoodieRollbackException(
+                  "Failed to rollback for commit " + commit, io);
+            } finally {
+              try {
+                if (writer != null) {
+                  writer.close();
+                }
+                if (success) {
+                  // This step is intentionally done after writer is closed. 
Guarantees that
+                  // getFileStatus would reflect correct stats and 
FileNotFoundException is not thrown in
+                  // cloud-storage : HUDI-168
+                  filesToNumBlocksRollback.put(this.getMetaClient().getFs()
+                      .getFileStatus(writer.getLogFile().getPath()), 1L);
+                }
+              } catch (IOException io) {
+                throw new UncheckedIOException(io);
               }
-            } catch (IOException io) {
-              throw new UncheckedIOException(io);
             }
           }
         });
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 98a5d62..93c1f72 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -1024,11 +1024,27 @@ public class TestMergeOnReadTable {
     statuses = writeClient.insert(recordsRDD, newCommitTime);
     writeClient.commit(newCommitTime, statuses);
 
-    // rollback a successful commit
     // Sleep for small interval (at least 1 second) to force a new rollback 
start time.
     Thread.sleep(1000);
+
+    // We will test HUDI-204 here. We will simulate rollback happening twice 
by copying the commit file to local fs
+    // and calling rollback twice
+    final String lastCommitTime = newCommitTime;
+    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieInstant last =
+        metaClient.getCommitsTimeline().getInstants().filter(instant -> 
instant.getTimestamp().equals(lastCommitTime))
+            .findFirst().get();
+    String fileName = last.getFileName();
+    // Save the .commit file to local directory.
+    // Rollback will be called twice to test the case where rollback failed 
first time and retried.
+    // We got the "BaseCommitTime cannot be null" exception before the fix
+    TemporaryFolder folder = new TemporaryFolder();
+    folder.create();
+    File file = folder.newFile();
+    metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), 
fileName), new Path(file.getAbsolutePath()));
     writeClient.rollback(newCommitTime);
-    final HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+
+    metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
basePath);
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     RealtimeView tableRTFileSystemView = table.getRTFileSystemView();
 
@@ -1042,6 +1058,11 @@ public class TestMergeOnReadTable {
           fileSlice.getLogFiles().count() > 0).count();
     }
     Assert.assertTrue(numLogFiles == 0);
+    metaClient.getFs().copyFromLocalFile(new Path(file.getAbsolutePath()),
+        new Path(metaClient.getMetaPath(), fileName));
+    Thread.sleep(1000);
+    // Rollback again to pretend the first rollback failed partially. This 
should not error our
+    writeClient.rollback(newCommitTime);
   }
 
   @Test

Reply via email to