This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4787076 HUDI-204 : Make MOR rollback idempotent and disable using
rolling stats for small file selection (#833)
4787076 is described below
commit 4787076c6d87488c0087ac0cfb7fa86596b1357a
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Tue Aug 13 17:13:30 2019 -0700
HUDI-204 : Make MOR rollback idempotent and disable using rolling stats for
small file selection (#833)
---
.../apache/hudi/table/HoodieMergeOnReadTable.java | 92 +++++++---------------
.../apache/hudi/table/TestMergeOnReadTable.java | 25 +++++-
2 files changed, 52 insertions(+), 65 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index ad0424b..e34a124 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -44,8 +44,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieRollingStat;
-import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
@@ -60,7 +58,6 @@ import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieUpsertException;
@@ -334,27 +331,6 @@ public class HoodieMergeOnReadTable<T extends
HoodieRecordPayload> extends
super.finalizeWrite(jsc, instantTs, stats);
}
- @Override
- protected HoodieRollingStatMetadata getRollingStats() {
- try {
- Option<HoodieInstant> lastInstant =
this.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
- .lastInstant();
- if (lastInstant.isPresent()) {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
-
this.getActiveTimeline().getInstantDetails(lastInstant.get()).get(),
HoodieCommitMetadata.class);
- Option<String> lastRollingStat =
Option.ofNullable(commitMetadata.getExtraMetadata()
- .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY));
- if (lastRollingStat.isPresent()) {
- return HoodieCommitMetadata
- .fromBytes(lastRollingStat.get().getBytes(),
HoodieRollingStatMetadata.class);
- }
- }
- return null;
- } catch (IOException e) {
- throw new HoodieException();
- }
- }
-
/**
* UpsertPartitioner for MergeOnRead table type, this allows auto correction
of small parquet
* files to larger ones without the need for an index in the logFile.
@@ -438,18 +414,6 @@ public class HoodieMergeOnReadTable<T extends
HoodieRecordPayload> extends
}
private long getTotalFileSize(String partitionPath, FileSlice fileSlice) {
- if (rollingStatMetadata != null) {
- Map<String, HoodieRollingStat> partitionRollingStats =
-
rollingStatMetadata.getPartitionToRollingStats().get(partitionPath);
- if (partitionRollingStats != null) {
- HoodieRollingStat rollingStatForFile =
partitionRollingStats.get(fileSlice.getFileId());
- if (rollingStatForFile != null) {
- long inserts = rollingStatForFile.getInserts();
- return averageRecordSize * inserts;
- }
- }
- }
- // In case Rolling Stats is not present, fall back to sizing log files
based on heuristics
if (!fileSlice.getDataFile().isPresent()) {
return
convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
} else {
@@ -506,35 +470,37 @@ public class HoodieMergeOnReadTable<T extends
HoodieRecordPayload> extends
}).forEach(wStat -> {
Writer writer = null;
String baseCommitTime =
fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
- boolean success = false;
- try {
- writer = HoodieLogFormat.newWriterBuilder().onParentPath(
- FSUtils.getPartitionPath(this.getMetaClient().getBasePath(),
partitionPath))
- .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
- .withFs(this.metaClient.getFs())
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
- // generate metadata
- Map<HeaderMetadataType, String> header = generateHeader(commit);
- // if update belongs to an existing log file
- writer = writer.appendBlock(new HoodieCommandBlock(header));
- success = true;
- } catch (IOException | InterruptedException io) {
- throw new HoodieRollbackException(
- "Failed to rollback for commit " + commit, io);
- } finally {
+ if (null != baseCommitTime) {
+ boolean success = false;
try {
- if (writer != null) {
- writer.close();
- }
- if (success) {
- // This step is intentionally done after writer is closed.
Guarantees that
- // getFileStatus would reflect correct stats and
FileNotFoundException is not thrown in
- // cloud-storage : HUDI-168
- filesToNumBlocksRollback.put(this.getMetaClient().getFs()
- .getFileStatus(writer.getLogFile().getPath()), 1L);
+ writer = HoodieLogFormat.newWriterBuilder().onParentPath(
+ FSUtils.getPartitionPath(this.getMetaClient().getBasePath(),
partitionPath))
+ .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
+ .withFs(this.metaClient.getFs())
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+ // generate metadata
+ Map<HeaderMetadataType, String> header = generateHeader(commit);
+ // if update belongs to an existing log file
+ writer = writer.appendBlock(new HoodieCommandBlock(header));
+ success = true;
+ } catch (IOException | InterruptedException io) {
+ throw new HoodieRollbackException(
+ "Failed to rollback for commit " + commit, io);
+ } finally {
+ try {
+ if (writer != null) {
+ writer.close();
+ }
+ if (success) {
+ // This step is intentionally done after writer is closed.
Guarantees that
+ // getFileStatus would reflect correct stats and
FileNotFoundException is not thrown in
+ // cloud-storage : HUDI-168
+ filesToNumBlocksRollback.put(this.getMetaClient().getFs()
+ .getFileStatus(writer.getLogFile().getPath()), 1L);
+ }
+ } catch (IOException io) {
+ throw new UncheckedIOException(io);
}
- } catch (IOException io) {
- throw new UncheckedIOException(io);
}
}
});
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 98a5d62..93c1f72 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -1024,11 +1024,27 @@ public class TestMergeOnReadTable {
statuses = writeClient.insert(recordsRDD, newCommitTime);
writeClient.commit(newCommitTime, statuses);
- // rollback a successful commit
// Sleep for small interval (at least 1 second) to force a new rollback
start time.
Thread.sleep(1000);
+
+ // We will test HUDI-204 here. We will simulate rollback happening twice
by copying the commit file to local fs
+ // and calling rollback twice
+ final String lastCommitTime = newCommitTime;
+ HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieInstant last =
+ metaClient.getCommitsTimeline().getInstants().filter(instant ->
instant.getTimestamp().equals(lastCommitTime))
+ .findFirst().get();
+ String fileName = last.getFileName();
+ // Save the .commit file to local directory.
+ // Rollback will be called twice to test the case where rollback failed
first time and retried.
+ // We got the "BaseCommitTime cannot be null" exception before the fix
+ TemporaryFolder folder = new TemporaryFolder();
+ folder.create();
+ File file = folder.newFile();
+ metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(),
fileName), new Path(file.getAbsolutePath()));
writeClient.rollback(newCommitTime);
- final HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+
+ metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
RealtimeView tableRTFileSystemView = table.getRTFileSystemView();
@@ -1042,6 +1058,11 @@ public class TestMergeOnReadTable {
fileSlice.getLogFiles().count() > 0).count();
}
Assert.assertTrue(numLogFiles == 0);
+ metaClient.getFs().copyFromLocalFile(new Path(file.getAbsolutePath()),
+ new Path(metaClient.getMetaPath(), fileName));
+ Thread.sleep(1000);
+ // Rollback again to pretend the first rollback failed partially. This
should not error our
+ writeClient.rollback(newCommitTime);
}
@Test