This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new e3d3677 [HUDI-1502] MOR rollback and restore support for metadata sync (#2421) e3d3677 is described below commit e3d3677b7e7899705b624925666317f0c074f7c7 Author: Sivabalan Narayanan <sivab...@uber.com> AuthorDate: Mon Jan 11 16:23:13 2021 -0500 [HUDI-1502] MOR rollback and restore support for metadata sync (#2421) - Adds field to RollbackMetadata that capture the logs written for rollback blocks - Adds field to RollbackMetadata that capture new logs files written by unsynced deltacommits Co-authored-by: Vinoth Chandar <vin...@apache.org> --- .../org/apache/hudi/io/HoodieAppendHandle.java | 1 + .../AbstractMarkerBasedRollbackStrategy.java | 26 +++-- .../BaseMergeOnReadRollbackActionExecutor.java | 3 +- .../hudi/table/action/rollback/RollbackUtils.java | 6 +- .../rollback/ListingBasedRollbackHelper.java | 21 +++- .../rollback/SparkMarkerBasedRollbackStrategy.java | 14 +++ .../hudi/metadata/TestHoodieBackedMetadata.java | 43 ++++--- .../rollback/TestMarkerBasedRollbackStrategy.java | 130 +++++++++++++++------ .../src/main/avro/HoodieRollbackMetadata.avsc | 16 ++- .../org/apache/hudi/common/HoodieRollbackStat.java | 20 +++- .../java/org/apache/hudi/common/fs/FSUtils.java | 13 ++- .../table/timeline/TimelineMetadataUtils.java | 28 ++--- .../hudi/metadata/HoodieTableMetadataUtil.java | 34 ++++-- .../hudi/common/table/TestTimelineUtils.java | 27 ++--- .../table/view/TestIncrementalFSViewSync.java | 2 +- 15 files changed, 268 insertions(+), 116 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 3faac2e..c6ea7ba 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -164,6 +164,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends // Since the actual log file written to can be different based on when rollover happens, we use the // base file to denote some log appends happened on a slice. writeToken will still fence concurrent // writers. + // https://issues.apache.org/jira/browse/HUDI-1517 createMarkerFile(partitionPath, FSUtils.makeDataFileName(baseInstantTime, writeToken, fileId, hoodieTable.getBaseFileExtension())); this.writer = createLogWriter(fileSlice, baseInstantTime); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java index cb6fff9..cc596ba 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java @@ -53,9 +53,9 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord protected final HoodieWriteConfig config; - private final String basePath; + protected final String basePath; - private final String instantTime; + protected final String instantTime; public AbstractMarkerBasedRollbackStrategy(HoodieTable<T, I, K, O> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) { this.table = table; @@ -90,6 +90,7 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend); String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName()); String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent()); + final Map<FileStatus, Long> writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId); HoodieLogFormat.Writer writer = null; try { @@ -121,17 +122,26 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord } } - Map<FileStatus, Long> filesToNumBlocksRollback = Collections.emptyMap(); - if (config.useFileListingMetadata()) { - // When metadata is enabled, the information of files appended to is required - filesToNumBlocksRollback = Collections.singletonMap( + // the information of files appended to is required for metadata sync + Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap( table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L); - } return HoodieRollbackStat.newBuilder() .withPartitionPath(partitionPath) .withRollbackBlockAppendResults(filesToNumBlocksRollback) - .build(); + .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build(); + } + + /** + * Returns written log file size map for the respective baseCommitTime to assist in metadata table syncing. + * @param partitionPath partition path of interest + * @param baseCommitTime base commit time of interest + * @param fileId fileId of interest + * @return Map<FileStatus, File size> + * @throws IOException + */ + protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPath, String baseCommitTime, String fileId) throws IOException { + return Collections.EMPTY_MAP; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java index 44cd5c8..2e75144 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java @@ -28,7 +28,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -57,7 +56,7 @@ public abstract class BaseMergeOnReadRollbackActionExecutor<T extends HoodieReco } @Override - protected List<HoodieRollbackStat> executeRollback() throws IOException { + protected List<HoodieRollbackStat> executeRollback() { HoodieTimer rollbackTimer = new HoodieTimer(); rollbackTimer.startTimer(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index 1ca0196..8ddb7e9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -74,14 +74,16 @@ public class RollbackUtils { final List<String> successDeleteFiles = new ArrayList<>(); final List<String> failedDeleteFiles = new ArrayList<>(); final Map<FileStatus, Long> commandBlocksCount = new HashMap<>(); - final List<FileStatus> filesToRollback = new ArrayList<>(); + final Map<FileStatus, Long> writtenLogFileSizeMap = new HashMap<>(); Option.ofNullable(stat1.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll); Option.ofNullable(stat2.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll); Option.ofNullable(stat1.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll); Option.ofNullable(stat2.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll); Option.ofNullable(stat1.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll); Option.ofNullable(stat2.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll); - return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount); + Option.ofNullable(stat1.getWrittenLogFileSizeMap()).ifPresent(writtenLogFileSizeMap::putAll); + Option.ofNullable(stat2.getWrittenLogFileSizeMap()).ifPresent(writtenLogFileSizeMap::putAll); + return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount, writtenLogFileSizeMap); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index a614aeb..fcb3882 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -49,6 +50,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import scala.Tuple2; @@ -116,12 +118,22 @@ public class ListingBasedRollbackHelper implements Serializable { .withDeletedFileResults(filesToDeletedStatus).build()); } case APPEND_ROLLBACK_BLOCK: { + String fileId = rollbackRequest.getFileId().get(); + String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get(); + + // collect all log files that is supposed to be deleted with this rollback + Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(), + FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()), + fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant) + .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); + Writer writer = null; try { writer = HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) - .withFileId(rollbackRequest.getFileId().get()) - .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs()) + .withFileId(fileId) + .overBaseCommit(latestBaseInstant) + .withFs(metaClient.getFs()) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); // generate metadata @@ -149,9 +161,11 @@ public class ListingBasedRollbackHelper implements Serializable { metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L ); + return new Tuple2<>(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); + .withRollbackBlockAppendResults(filesToNumBlocksRollback) + .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build()); } default: throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); @@ -159,7 +173,6 @@ public class ListingBasedRollbackHelper implements Serializable { }); } - /** * Common method used for cleaning out base files under a partition path during rollback of a set of commits. */ diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java index 0598e54..0f36cb8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java @@ -22,7 +22,10 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.IOType; @@ -32,10 +35,14 @@ import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; +import org.apache.hadoop.fs.FileStatus; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import scala.Tuple2; @@ -75,4 +82,11 @@ public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> ext throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e); } } + + protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException { + // collect all log files that is supposed to be deleted with this rollback + return FSUtils.getAllLogFiles(table.getMetaClient().getFs(), + FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime) + .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index 7e3dea4..3d770c7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -80,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieBackedMetadata extends HoodieClientTestHarness { + private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class); @TempDir @@ -261,13 +262,11 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Test rollback of various table operations sync to Metadata Table correctly. */ - //@ParameterizedTest - //@EnumSource(HoodieTableType.class) - //public void testRollbackOperations(HoodieTableType tableType) throws Exception { - @Test - public void testRollbackOperations() throws Exception { + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testRollbackOperations(HoodieTableType tableType) throws Exception { //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed - init(HoodieTableType.COPY_ON_WRITE); + init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { @@ -371,13 +370,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } /** - * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op - * occurs to metadata. - * @throws Exception + * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op occurs to metadata. + * Once explicit sync is called, metadata should match. */ - @Test - public void testRollbackUnsyncedCommit() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws Exception { + init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { @@ -389,7 +388,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { assertNoWriteErrors(writeStatuses); validateMetadata(client); } - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { // Commit with metadata disabled @@ -401,6 +399,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true))) { + assertFalse(metadata(client).isInSync()); + client.syncTableMetadata(); validateMetadata(client); } } @@ -528,8 +528,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } /** - * Instants on Metadata Table should be archived as per config. - * Metadata Table should be automatically compacted as per config. + * Instants on Metadata Table should be archived as per config. Metadata Table should be automatically compacted as per config. */ @Test public void testCleaningArchivingAndCompaction() throws Exception { @@ -752,8 +751,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Validate the metadata tables contents to ensure it matches what is on the file system. - * - * @throws IOException */ private void validateMetadata(SparkRDDWriteClient client) throws IOException { HoodieWriteConfig config = client.getConfig(); @@ -807,7 +804,19 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) { LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray())); LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray())); + + for (String fileName : fsFileNames) { + if (!metadataFilenames.contains(fileName)) { + LOG.error(partition + "FsFilename " + fileName + " not found in Meta data"); + } + } + for (String fileName : metadataFilenames) { + if (!fsFileNames.contains(fileName)) { + LOG.error(partition + "Metadata file " + fileName + " not found in original FS"); + } + } } + assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match"); assertTrue(fsFileNames.equals(metadataFilenames), "Files within partition " + partition + " should match"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java index 7acff79..83e2b05 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java @@ -18,20 +18,33 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hadoop.fs.FileStatus; +import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import java.util.Arrays; import java.util.List; import java.util.stream.Stream; @@ -40,13 +53,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { + private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with listing metadata enable={0}"; + + public static Stream<Arguments> configParams() { + return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of); + } + + private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE; + @BeforeEach public void setUp() throws Exception { initPath(); initSparkContexts(); initFileSystem(); - initMetaClient(); - initDFS(); + initMetaClient(tableType); + initTestDataGenerator(); } @AfterEach @@ -55,7 +76,7 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { } @Test - public void testCopyOnWriteRollback() throws Exception { + public void testCopyOnWriteRollbackWithTestTable() throws Exception { // given: wrote some base files and corresponding markers HoodieTestTable testTable = HoodieTestTable.of(metaClient); String f0 = testTable.addRequestedCommit("000") @@ -85,43 +106,78 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum()); } - @Test - public void testMergeOnReadRollback() throws Exception { - // given: wrote some base + log files and corresponding markers - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - String f2 = testTable.addRequestedDeltaCommit("000") - .getFileIdsWithBaseFilesInPartitions("partA").get("partA"); - String f1 = testTable.addDeltaCommit("001") - .withLogFile("partA", f2) - .getFileIdsWithBaseFilesInPartitions("partB").get("partB"); - String f3 = "f3"; - String f4 = "f4"; - testTable.forDeltaCommit("001") - .withMarkerFile("partB", f1, IOType.CREATE) - .withMarkerFile("partA", f3, IOType.CREATE) - .withMarkerFile("partA", f2, IOType.APPEND) - .withMarkerFile("partB", f4, IOType.APPEND); + @Tag("functional") + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testCopyOnWriteRollback(boolean useFileListingMetadata) throws Exception { + HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build()) + .withPath(basePath).build(); + + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig)) { + // rollback 2nd commit and ensure stats reflect the info. + List<HoodieRollbackStat> stats = testRun(useFileListingMetadata, writeConfig, writeClient); + + assertEquals(3, stats.size()); + for (HoodieRollbackStat stat : stats) { + assertEquals(1, stat.getSuccessDeleteFiles().size()); + assertEquals(0, stat.getFailedDeleteFiles().size()); + assertEquals(0, stat.getCommandBlocksCount().size()); + assertEquals(0, stat.getWrittenLogFileSizeMap().size()); + } + } + } - // when - List<HoodieRollbackStat> stats = new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002") - .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "001")); + @Tag("functional") + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testMergeOnReadRollback(boolean useFileListingMetadata) throws Exception { + // init MERGE_ON_READ_TABLE + tearDown(); + tableType = HoodieTableType.MERGE_ON_READ; + setUp(); + + HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build()) + .withPath(basePath).build(); + + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig)) { + + // rollback 2nd commit and ensure stats reflect the info. + List<HoodieRollbackStat> stats = testRun(useFileListingMetadata, writeConfig, writeClient); + + assertEquals(3, stats.size()); + for (HoodieRollbackStat stat : stats) { + assertEquals(0, stat.getSuccessDeleteFiles().size()); + assertEquals(0, stat.getFailedDeleteFiles().size()); + assertEquals(1, stat.getCommandBlocksCount().size()); + stat.getCommandBlocksCount().forEach((fileStatus, len) -> assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()))); + assertEquals(1, stat.getWrittenLogFileSizeMap().size()); + stat.getWrittenLogFileSizeMap().forEach((fileStatus, len) -> assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()))); + } + } + } - // then: ensure files are deleted, rollback block is appended (even if append does not exist) - assertEquals(2, stats.size()); - // will have the log file - FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB"); - assertEquals(1, partBFiles.length); - assertTrue(partBFiles[0].getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())); - assertTrue(partBFiles[0].getLen() > 0); + private List<HoodieRollbackStat> testRun(boolean useFileListingMetadata, HoodieWriteConfig writeConfig, SparkRDDWriteClient writeClient) { + String newCommitTime = "001"; + writeClient.startCommitWithTime(newCommitTime); - FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA"); - assertEquals(3, partAFiles.length); - assertEquals(2, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count()); - assertEquals(1, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count()); + List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD<WriteStatus> writeStatuses = writeClient.insert(jsc.parallelize(records, 1), newCommitTime); + writeClient.commit(newCommitTime, writeStatuses); - // only partB/f1_001 will be deleted - assertEquals(1, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum()); - // partA/f3_001 is non existent - assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum()); + // Updates + newCommitTime = "002"; + writeClient.startCommitWithTime(newCommitTime); + records = dataGen.generateUniqueUpdates(newCommitTime, 50); + writeStatuses = writeClient.upsert(jsc.parallelize(records, 1), newCommitTime); + writeStatuses.collect(); + + // rollback 2nd commit and ensure stats reflect the info. + return new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003") + .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002")); } + } diff --git a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc index 069881e..a972bfd 100644 --- a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc @@ -31,18 +31,24 @@ {"name": "partitionPath", "type": "string"}, {"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}}, {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}}, - {"name": "appendFiles", "type": { + {"name": "rollbackLogFiles", "type": { "type": "map", - "doc": "Files to which append blocks were written", + "doc": "Files to which append blocks were written to capture rollback commit", "values": { "type": "long", "doc": "Size of this file in bytes" } + }}, + {"name": "writtenLogFiles", "type": { + "type": "map", + "doc": "Log files written that were expected to be rolledback", + "values": { + "type": "long", + "doc": "Size of this file in bytes" + } }} ] - } - } - }, + }}}, { "name":"version", "type":["int", "null"], diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java index a3191fa..3e4ee34 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java @@ -38,13 +38,16 @@ public class HoodieRollbackStat implements Serializable { private final List<String> failedDeleteFiles; // Count of HoodieLogFile to commandBlocks written for a particular rollback private final Map<FileStatus, Long> commandBlocksCount; + // all log files with same base instant as instant to be rolledback + private final Map<FileStatus, Long> writtenLogFileSizeMap; public HoodieRollbackStat(String partitionPath, List<String> successDeleteFiles, List<String> failedDeleteFiles, - Map<FileStatus, Long> commandBlocksCount) { + Map<FileStatus, Long> commandBlocksCount, Map<FileStatus, Long> writtenLogFileSizeMap) { this.partitionPath = partitionPath; this.successDeleteFiles = successDeleteFiles; this.failedDeleteFiles = failedDeleteFiles; this.commandBlocksCount = commandBlocksCount; + this.writtenLogFileSizeMap = writtenLogFileSizeMap; } public Map<FileStatus, Long> getCommandBlocksCount() { @@ -63,6 +66,10 @@ public class HoodieRollbackStat implements Serializable { return failedDeleteFiles; } + public Map<FileStatus, Long> getWrittenLogFileSizeMap() { + return writtenLogFileSizeMap; + } + public static HoodieRollbackStat.Builder newBuilder() { return new Builder(); } @@ -75,6 +82,7 @@ public class HoodieRollbackStat implements Serializable { private List<String> successDeleteFiles; private List<String> failedDeleteFiles; private Map<FileStatus, Long> commandBlocksCount; + private Map<FileStatus, Long> writtenLogFileSizeMap; private String partitionPath; public Builder withDeletedFileResults(Map<FileStatus, Boolean> deletedFiles) { @@ -100,6 +108,11 @@ public class HoodieRollbackStat implements Serializable { return this; } + public Builder withWrittenLogFileSizeMap(Map<FileStatus, Long> writtenLogFileSizeMap) { + this.writtenLogFileSizeMap = writtenLogFileSizeMap; + return this; + } + public Builder withPartitionPath(String partitionPath) { this.partitionPath = partitionPath; return this; @@ -115,7 +128,10 @@ public class HoodieRollbackStat implements Serializable { if (commandBlocksCount == null) { commandBlocksCount = Collections.EMPTY_MAP; } - return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount); + if (writtenLogFileSizeMap == null) { + writtenLogFileSizeMap = Collections.EMPTY_MAP; + } + return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount, writtenLogFileSizeMap); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 7978eed..1990c0a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -45,6 +45,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -426,10 +427,14 @@ public class FSUtils { */ public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path partitionPath, final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { - return Arrays - .stream(fs.listStatus(partitionPath, - path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension))) - .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime)); + try { + return Arrays + .stream(fs.listStatus(partitionPath, + path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension))) + .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime)); + } catch (FileNotFoundException e) { + return Stream.<HoodieLogFile>builder().build(); + } } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java index 962d69d..9b419ca 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java @@ -18,17 +18,6 @@ package org.apache.hudi.common.table.timeline; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.file.FileReader; -import org.apache.avro.file.SeekableByteArrayInput; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hadoop.fs.FileStatus; - import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; @@ -44,6 +33,17 @@ import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.FileReader; +import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.fs.FileStatus; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Collections; @@ -71,10 +71,12 @@ public class TimelineMetadataUtils { Map<String, HoodieRollbackPartitionMetadata> partitionMetadataBuilder = new HashMap<>(); int totalDeleted = 0; for (HoodieRollbackStat stat : rollbackStats) { - Map<String, Long> appendFiles = stat.getCommandBlocksCount().keySet().stream() + Map<String, Long> rollbackLogFiles = stat.getCommandBlocksCount().keySet().stream() + .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen)); + Map<String, Long> probableLogFiles = stat.getWrittenLogFileSizeMap().keySet().stream() .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen)); HoodieRollbackPartitionMetadata metadata = new HoodieRollbackPartitionMetadata(stat.getPartitionPath(), - stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), appendFiles); + stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), rollbackLogFiles, probableLogFiles); partitionMetadataBuilder.put(stat.getPartitionPath(), metadata); totalDeleted += stat.getSuccessDeleteFiles().size(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index ed2a878..5942312 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -18,7 +18,6 @@ package org.apache.hudi.metadata; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -34,6 +33,8 @@ import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; + +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -44,6 +45,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.function.BiFunction; import java.util.stream.Collectors; import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; @@ -246,12 +248,13 @@ public class HoodieTableMetadataUtil { rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { // Has this rollback produced new files? - boolean hasAppendFiles = pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0; + boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && !pm.getRollbackLogFiles().isEmpty(); + boolean hasNonZeroRollbackLogFiles = hasRollbackLogFiles && pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0; // If commit being rolled back has not been synced to metadata table yet then there is no need to update metadata boolean shouldSkip = lastSyncTs.isPresent() && HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get()); - if (!hasAppendFiles && shouldSkip) { + if (!hasNonZeroRollbackLogFiles && shouldSkip) { LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, given metadata table is already synced upto to %s", rollbackMetadata.getCommitsRollback().get(0), lastSyncTs.get())); return; @@ -269,16 +272,31 @@ public class HoodieTableMetadataUtil { partitionToDeletedFiles.get(partition).addAll(deletedFiles); } - if (!pm.getAppendFiles().isEmpty()) { + BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> { + // if a file exists in both written log files and rollback log files, we want to pick the one that is higher + // as rollback file could have been updated after written log files are computed. + return oldSize > newSizeCopy ? oldSize : newSizeCopy; + }; + + if (hasRollbackLogFiles) { if (!partitionToAppendedFiles.containsKey(partition)) { partitionToAppendedFiles.put(partition, new HashMap<>()); } // Extract appended file name from the absolute paths saved in getAppendFiles() - pm.getAppendFiles().forEach((path, size) -> { - partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> { - return size + oldSize; - }); + pm.getRollbackLogFiles().forEach((path, size) -> { + partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn); + }); + } + + if (pm.getWrittenLogFiles() != null && !pm.getWrittenLogFiles().isEmpty()) { + if (!partitionToAppendedFiles.containsKey(partition)) { + partitionToAppendedFiles.put(partition, new HashMap<>()); + } + + // Extract appended file name from the absolute paths saved in getWrittenLogFiles() + pm.getWrittenLogFiles().forEach((path, size) -> { + partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn); }); } }); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index b79ffbb..18c0d3f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -72,8 +72,8 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, ts1); activeTimeline.createNewInstant(instant1); // create replace metadata only with replaced file Ids (no new files created) - activeTimeline.saveAsComplete(instant1, - Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition,2, newFilePartition,0, Collections.emptyMap()))); + activeTimeline.saveAsComplete(instant1, + Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2, newFilePartition, 0, Collections.emptyMap()))); metaClient.reloadActiveTimeline(); List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0", 10)); @@ -85,7 +85,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { activeTimeline.createNewInstant(instant2); // create replace metadata only with replaced file Ids (no new files created) activeTimeline.saveAsComplete(instant2, - Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition,0, newFilePartition,3, Collections.emptyMap()))); + Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0, newFilePartition, 3, Collections.emptyMap()))); metaClient.reloadActiveTimeline(); partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); assertEquals(1, partitions.size()); @@ -96,7 +96,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { assertTrue(partitions.contains(replacePartition)); assertTrue(partitions.contains(newFilePartition)); } - + @Test public void testGetPartitions() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); @@ -120,20 +120,20 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { // verify modified partitions included cleaned data List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); assertEquals(5, partitions.size()); - assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4", "5"})); + assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4", "5"})); partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); assertEquals(4, partitions.size()); - assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4"})); + assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4"})); // verify only commit actions partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); assertEquals(4, partitions.size()); - assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"})); + assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"})); partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); assertEquals(3, partitions.size()); - assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"})); + assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"})); } @Test @@ -181,10 +181,10 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { // verify modified partitions included cleaned data List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); - assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"})); + assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"})); partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); - assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"})); + assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"})); } @Test @@ -201,7 +201,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { activeTimeline.createNewInstant(instant); activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap()))); - ts = "1"; + ts = "1"; instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); activeTimeline.createNewInstant(instant); Map<String, String> extraMetadata = new HashMap<>(); @@ -241,7 +241,8 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { List<HoodieInstant> rollbacks = new ArrayList<>(); rollbacks.add(new HoodieInstant(false, actionType, commitTs)); - HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap()); + HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap(), + Collections.EMPTY_MAP); List<HoodieRollbackStat> rollbackStats = new ArrayList<>(); rollbackStats.add(rollbackStat); return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats); @@ -264,7 +265,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { } private byte[] getReplaceCommitMetadata(String basePath, String commitTs, String replacePartition, int replaceCount, - String newFilePartition, int newFileCount, Map<String, String> extraMetadata) + String newFilePartition, int newFileCount, Map<String, String> extraMetadata) throws IOException { HoodieReplaceCommitMetadata commit = new HoodieReplaceCommitMetadata(); for (int i = 1; i <= newFileCount; i++) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index 5400dc4..146e0bb 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -556,7 +556,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { boolean isRestore) throws IOException { Map<String, List<String>> partititonToFiles = deleteFiles(files); List<HoodieRollbackStat> rollbackStats = partititonToFiles.entrySet().stream().map(e -> - new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>()) + new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>(), new HashMap<>()) ).collect(Collectors.toList()); List<HoodieInstant> rollbacks = new ArrayList<>();