This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 06a365e4768 [HUDI-8123] Fix MDT file listing to exclude non-existent
log files in marker-based rollback (#11830)
06a365e4768 is described below
commit 06a365e47684235ed800ef27a750b8f1609c1c59
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat Aug 31 22:54:04 2024 -0700
[HUDI-8123] Fix MDT file listing to exclude non-existent log files in
marker-based rollback (#11830)
---
.../rollback/MarkerBasedRollbackStrategy.java | 30 ++-
.../TestHoodieSparkMergeOnReadTableCompaction.java | 234 +++++++++++++++++++--
.../TestMarkerBasedRollbackStrategy.java | 67 ++++--
.../hudi/common/testutils/FileCreateUtils.java | 8 +-
.../hudi/common/testutils/HoodieTestTable.java | 38 +++-
5 files changed, 337 insertions(+), 40 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 9eede1ca4fc..2b8299b2852 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -32,6 +32,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -151,9 +153,31 @@ public class MarkerBasedRollbackStrategy<T, I, K, O>
implements BaseRollbackPlan
HoodieLogFile logFileToRollback = new HoodieLogFile(fullLogFilePath);
fileId = logFileToRollback.getFileId();
baseCommitTime = logFileToRollback.getBaseCommitTime();
- // NOTE: We don't strictly need the exact size, but this size needs to
be positive to pass metadata payload validation.
- // Therefore, we simply stub this value (1L), instead of doing a
fs call to get the exact size.
- logBlocksToBeDeleted =
Collections.singletonMap(logFileToRollback.getPath().getName(), 1L);
+ try {
+ StoragePathInfo pathInfo =
table.getMetaClient().getStorage().getPathInfo(logFileToRollback.getPath());
+ if (pathInfo != null) {
+ if (baseCommitTime.equals(instantToRollback.getTimestamp())) {
+ // delete the log file that creates a new file group
+ return new HoodieRollbackRequest(relativePartitionPath,
EMPTY_STRING, EMPTY_STRING,
+
Collections.singletonList(logFileToRollback.getPath().toString()),
+ Collections.emptyMap());
+ }
+ // append a rollback block to the log block that is added to an
existing file group
+ logBlocksToBeDeleted = Collections.singletonMap(
+ logFileToRollback.getPath().getName(), pathInfo.getLength());
+ } else {
+ LOG.debug(
+ "File info of {} is null indicating the file does not exist;"
+ + " there is no need to include it in the rollback.",
+ fullLogFilePath);
+ }
+ } catch (FileNotFoundException e) {
+ LOG.debug(
+ "Log file {} is not found so there is no need to include it in the
rollback.",
+ fullLogFilePath);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get the file status of " +
fullLogFilePath, e);
+ }
}
return new HoodieRollbackRequest(relativePartitionPath, fileId,
baseCommitTime, Collections.emptyList(), logBlocksToBeDeleted);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index 4cc2e4edfd4..85a9c6759c2 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -19,49 +19,68 @@
package org.apache.hudi.table.functional;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
+import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
public class TestHoodieSparkMergeOnReadTableCompaction extends
SparkClientFunctionalTestHarness {
@@ -124,24 +143,24 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
client = getHoodieWriteClient(config);
// write data and commit
- writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true);
+ writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true, false);
// write data again, and in the case of bucket index, all records will go
into log files (we use a small max_file_size)
- writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true);
- Assertions.assertEquals(200, readTableTotalRecordsNum());
+ writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true, false);
+ assertEquals(200, readTableTotalRecordsNum());
// schedule compaction
String compactionTime = (String)
client.scheduleCompaction(Option.empty()).get();
// write data, and do not commit. those records should not visible to
reader
String insertTime = HoodieActiveTimeline.createNewInstantTime();
- List<WriteStatus> writeStatuses = writeData(insertTime, 100, false);
- Assertions.assertEquals(200, readTableTotalRecordsNum());
+ List<WriteStatus> writeStatuses = writeData(insertTime, 100, false, false);
+ assertEquals(200, readTableTotalRecordsNum());
// commit the write. The records should be visible now even though the
compaction does not complete.
client.commitStats(insertTime, context().parallelize(writeStatuses, 1),
writeStatuses.stream().map(WriteStatus::getStat)
.collect(Collectors.toList()), Option.empty(),
metaClient.getCommitActionType());
- Assertions.assertEquals(300, readTableTotalRecordsNum());
+ assertEquals(300, readTableTotalRecordsNum());
// after the compaction, total records should remain the same
config.setValue(AUTO_COMMIT_ENABLE, "true");
client.compact(compactionTime);
- Assertions.assertEquals(300, readTableTotalRecordsNum());
+ assertEquals(300, readTableTotalRecordsNum());
}
@ParameterizedTest
@@ -183,7 +202,184 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
client.delete(deleteRecords, client.startCommit());
// insert the same 100 records again
client.upsert(writeRecords, client.startCommit());
- Assertions.assertEquals(100, readTableTotalRecordsNum());
+ assertEquals(100, readTableTotalRecordsNum());
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
+ void testCompactionSchedulingWithUncommittedLogFileOrRollback(boolean
enableMetadataTable,
+ boolean
runRollback) throws IOException {
+ Properties props = getPropertiesForKeyGen(true);
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .forTable("test-trip-table")
+ .withPath(basePath())
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withAutoCommit(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withStorageConfig(HoodieStorageConfig.newBuilder()
+ .parquetMaxFileSize(20480).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withHeartbeatIntervalInMs(120000)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+ .build();
+ props.putAll(config.getProps());
+
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
+ client = getHoodieWriteClient(config);
+
+ // instant 1: write inserts and commit, generating base files
+ String instant1 = HoodieActiveTimeline.createNewInstantTime();
+ writeData(instant1, 100, true, false);
+ assertEquals(100, readTableTotalRecordsNum());
+ validateFileListingInMetadataTable();
+ // instant 2: write updates in log files and simulate failed deltacommit
+ String instant2 = HoodieActiveTimeline.createNewInstantTime();
+ List<WriteStatus> writeStatuses2 = writeData(instant2, 100, false, true);
+
+ // remove half of the log files written to simulate the failure case
+ // where the marker is created but the log file is not written
+ List<StoragePathInfo> files = hoodieStorage().listFiles(new
StoragePath(basePath()));
+ int numTotalLogFiles = 0;
+ for (StoragePathInfo file : files) {
+ if (file.isFile() && !file.getPath().toString().contains(METAFOLDER_NAME)
+ && FSUtils.isLogFile(file.getPath())) {
+ numTotalLogFiles++;
+ if (numTotalLogFiles % 2 == 0) {
+ hoodieStorage().deleteFile(file.getPath());
+ }
+ }
+ }
+
+ int numLogFilesAfterDeletion = 0;
+ files = hoodieStorage().listFiles(new StoragePath(basePath()));
+ for (StoragePathInfo file : files) {
+ if (file.isFile() && !file.getPath().toString().contains(METAFOLDER_NAME)
+ && FSUtils.isLogFile(file.getPath())) {
+ numLogFilesAfterDeletion++;
+ }
+ }
+
+ // validate the current table state to satisfy what is intended to test
+ if (enableMetadataTable) {
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder()
+
.setConf(storageConf()).setBasePath(HoodieTableMetadata.getMetadataTableBasePath(basePath())).build();
+ assertEquals(instant1,
metadataMetaClient.getActiveTimeline().lastInstant().get().getTimestamp());
+ }
+ assertTrue(numLogFilesAfterDeletion > 0 && numLogFilesAfterDeletion <
numTotalLogFiles);
+ assertEquals(instant2,
metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
+ assertEquals(instant1,
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
+ assertEquals(100, readTableTotalRecordsNum());
+
+ // instant 3: write updates in log files and make a successful deltacommit
+ String instant3 = HoodieActiveTimeline.createNewInstantTime();
+ writeData(instant3, 100, true, true);
+
+ if (runRollback) {
+ // If enabled, rollback the failed delta commit
+ client.rollback(instant2);
+ validateLogFilesExistInRollbackPlan();
+ validateFileListingInMetadataTable();
+ }
+
+ // schedule compaction
+ String compactionInstant = (String)
client.scheduleCompaction(Option.empty()).get();
+ validateFilesExistInCompactionPlan(compactionInstant);
+ if (!runRollback) {
+ // committing instant2 that conflicts with the compaction plan should
fail
+ assertThrows(HoodieWriteConflictException.class, () ->
commitToTable(instant2, writeStatuses2));
+ }
+ config.setValue(AUTO_COMMIT_ENABLE, "true");
+ client.compact(compactionInstant);
+ if (runRollback) {
+ validateFileListingInMetadataTable();
+ }
+ assertEquals(100, readTableTotalRecordsNum());
+ assertEquals(
+ compactionInstant,
+
metaClient.reloadActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
+ }
+
+ private void validateLogFilesExistInRollbackPlan() throws IOException {
+ metaClient.reloadActiveTimeline();
+ HoodieRollbackPlan rollbackPlan =
RollbackUtils.getRollbackPlan(metaClient, metaClient.getActiveTimeline()
+ .filter(e ->
HoodieActiveTimeline.ROLLBACK_ACTION.equals(e.getAction()))
+ .lastInstant().get());
+ assertTrue(rollbackPlan.getRollbackRequests().stream()
+ .map(request -> {
+ boolean allExist = true;
+ StoragePath partitionPath = new StoragePath(basePath(),
request.getPartitionPath());
+ for (String logFile : request.getLogBlocksToBeDeleted().keySet()) {
+ try {
+ allExist = allExist && hoodieStorage().exists(new
StoragePath(partitionPath, logFile));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return allExist;
+ })
+ .reduce(Boolean::logicalAnd)
+ .get());
+ }
+
+ private void validateFilesExistInCompactionPlan(String compactionInstant) {
+ HoodieCompactionPlan compactionPlan =
CompactionUtils.getCompactionPlan(metaClient, compactionInstant);
+ assertTrue(compactionPlan.getOperations().stream()
+ .map(op -> {
+ boolean allExist;
+ StoragePath partitionPath = new StoragePath(basePath(),
op.getPartitionPath());
+ try {
+ allExist = hoodieStorage().exists(new StoragePath(partitionPath,
op.getDataFilePath()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ for (String logFilePath : op.getDeltaFilePaths()) {
+ try {
+ allExist = allExist && hoodieStorage().exists(new
StoragePath(partitionPath, logFilePath));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return allExist;
+ })
+ .reduce(Boolean::logicalAnd)
+ .get());
+ }
+
+ private void validateFileListingInMetadataTable() {
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context(),
hoodieStorage(), basePath(), false, false)
+ .stream()
+ .map(e -> new StoragePath(basePath(), e).toString())
+ .collect(Collectors.toList());
+ Map<String, List<StoragePathInfo>> filesFromStorage =
FSUtils.getFilesInPartitions(
+ context(),
+ hoodieStorage(),
+ HoodieMetadataConfig.newBuilder().enable(false).build(),
+ basePath(),
+ partitionPaths.toArray(new String[0]));
+ Map<String, List<StoragePathInfo>> filesFromMetadataTable =
FSUtils.getFilesInPartitions(
+ context(),
+ hoodieStorage(),
+ HoodieMetadataConfig.newBuilder().enable(true).build(),
+ basePath(),
+ partitionPaths.toArray(new String[0]));
+ assertEquals(filesFromStorage.size(), filesFromMetadataTable.size());
+ for (String partition : filesFromStorage.keySet()) {
+ List<StoragePathInfo> partitionFilesFromStorage =
filesFromStorage.get(partition).stream()
+ .sorted().collect(Collectors.toList());
+ List<StoragePathInfo> partitionFilesFromMetadataTable =
filesFromMetadataTable.get(partition).stream()
+ .sorted().collect(Collectors.toList());
+ assertEquals(partitionFilesFromStorage.size(),
partitionFilesFromMetadataTable.size());
+ for (int i = 0; i < partitionFilesFromStorage.size(); i++) {
+ assertEquals(
+ partitionFilesFromStorage.get(i).getPath().toString(),
+ partitionFilesFromMetadataTable.get(i).getPath().toString());
+ }
+ }
}
private long readTableTotalRecordsNum() {
@@ -191,19 +387,29 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
Arrays.stream(dataGen.getPartitionPaths()).map(p ->
Paths.get(basePath(), p).toString()).collect(Collectors.toList()),
basePath()).size();
}
- private List<WriteStatus> writeData(String instant, int numRecords, boolean
doCommit) {
+ private List<WriteStatus> writeData(String instant,
+ int numRecords,
+ boolean doCommit,
+ boolean generateUpdates) throws
IOException {
metaClient = HoodieTableMetaClient.reload(metaClient);
- JavaRDD records = jsc().parallelize(dataGen.generateInserts(instant,
numRecords), 2);
+ List<HoodieRecord> recordList = generateUpdates
+ ? dataGen.generateUpdates(instant, numRecords) :
dataGen.generateInserts(instant, numRecords);
+ JavaRDD records = jsc().parallelize(recordList, 2);
metaClient = HoodieTableMetaClient.reload(metaClient);
client.startCommitWithTime(instant);
List<WriteStatus> writeStatuses = client.upsert(records,
instant).collect();
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
if (doCommit) {
- List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
- boolean committed = client.commitStats(instant,
context().parallelize(writeStatuses, 1), writeStats, Option.empty(),
metaClient.getCommitActionType());
- Assertions.assertTrue(committed);
+ commitToTable(instant, writeStatuses);
}
metaClient = HoodieTableMetaClient.reload(metaClient);
return writeStatuses;
}
+
+ private void commitToTable(String instant, List<WriteStatus> writeStatuses) {
+ List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+ boolean committed =
+ client.commitStats(instant, context().parallelize(writeStatuses, 1),
writeStats, Option.empty(), metaClient.getCommitActionType());
+ assertTrue(committed);
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
index 4612e0eeda6..80b12ad91e2 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
@@ -31,7 +31,9 @@ import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
@@ -47,14 +49,17 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import java.util.stream.Stream;
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -95,7 +100,7 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
String f0 = testTable.addRequestedCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
testTable.forCommit("001")
- .withLogMarkerFile("partA", f0, IOType.APPEND);
+ .withLogMarkerFile("000", "partA", f0, IOType.APPEND, 1);
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
List<HoodieRollbackRequest> rollbackRequests = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
@@ -104,26 +109,62 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
}
@ParameterizedTest
- @EnumSource(names = {"APPEND"})
- public void testMarkerBasedRollbackAppendWithLogFileMarkers(IOType
testIOType) throws Exception {
+ @CsvSource(value = {"APPEND,true,true", "APPEND,true,false",
"APPEND,false,true", "APPEND,false,false"})
+ public void testMarkerBasedRollbackAppendWithLogFileMarkers(IOType
testIOType,
+ boolean
logFileInNewFileGroup,
+ boolean
logFileExists) throws Exception {
tearDown();
tableType = HoodieTableType.MERGE_ON_READ;
setUp();
+ String partitionPath = "partA";
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
- String f0 = testTable.addRequestedCommit("000")
- .getFileIdWithLogFile("partA");
- testTable.forCommit("001")
- .withLogMarkerFile("partA", f0, testIOType);
+ String f0 = logFileInNewFileGroup ? UUID.randomUUID().toString()
+ : testTable.addDeltaCommit("000").getFileIdWithLogFile(partitionPath);
+ String logFileName = EMPTY_STRING;
+ int logFileVersion = 1;
+ int logFileSize = 13042;
+ String logFileBaseInstantTime = logFileInNewFileGroup ? "001" : "000";
+ // log file name should still use the base instant time
+ testTable.addInflightDeltaCommit("001")
+ .withLogMarkerFile(logFileBaseInstantTime, partitionPath, f0,
testIOType, logFileVersion);
+ if (logFileExists) {
+ testTable.withLogFilesAndBaseInstantTimeInPartition(
+ partitionPath, Collections.singletonList(
+ Pair.of(Pair.of(logFileBaseInstantTime, f0), new Integer[]
{logFileVersion, logFileSize})));
+ logFileName = testTable.getLogFileNameById(logFileBaseInstantTime, f0,
logFileVersion);
+ }
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
List<HoodieRollbackRequest> rollbackRequests = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(), "002")
.getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, "001"));
assertEquals(1, rollbackRequests.size());
HoodieRollbackRequest rollbackRequest = rollbackRequests.get(0);
- assertEquals("partA", rollbackRequest.getPartitionPath());
- assertEquals(f0, rollbackRequest.getFileId());
- assertEquals(testIOType.equals(IOType.CREATE) ? 1 : 0,
rollbackRequest.getFilesToBeDeleted().size());
- assertEquals(1, rollbackRequest.getLogBlocksToBeDeleted().size());
+ assertEquals(partitionPath, rollbackRequest.getPartitionPath());
+ assertEquals((logFileInNewFileGroup && logFileExists) ? EMPTY_STRING : f0,
rollbackRequest.getFileId());
+ if (logFileExists) {
+ if (logFileInNewFileGroup) {
+ // log file is written to a new file group in the failed instant; the
rollback plan
+ // should include it in the "filesToBeDeleted" field so it is going to
be deleted
+ assertEquals(1, rollbackRequest.getFilesToBeDeleted().size());
+ assertEquals(
+ new StoragePath(new StoragePath(basePath, partitionPath),
logFileName).toString(),
+ rollbackRequest.getFilesToBeDeleted().get(0));
+ assertEquals(0, rollbackRequest.getLogBlocksToBeDeleted().size());
+ } else {
+ // log file is written to an existing file group in the failed
instant; the rollback plan
+ // should include it in the "logBlocksToBeDeleted" field, so it is
kept on storage rolled
+ // back through a command log block
+ assertEquals(0, rollbackRequest.getFilesToBeDeleted().size());
+ assertEquals(1, rollbackRequest.getLogBlocksToBeDeleted().size());
+
assertTrue(rollbackRequest.getLogBlocksToBeDeleted().containsKey(logFileName));
+ assertEquals(logFileSize,
rollbackRequest.getLogBlocksToBeDeleted().get(logFileName));
+ }
+ } else {
+ // log file marker exists but the log file is not written to the
storage, so the rollback
+ // plan should not include it.
+ assertEquals(0, rollbackRequest.getFilesToBeDeleted().size());
+ assertEquals(0, rollbackRequest.getLogBlocksToBeDeleted().size());
+ }
}
@Test
@@ -295,7 +336,7 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
String f0 = testTable.addRequestedCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
testTable.forCommit("001")
- .withLogMarkerFile("partA", f0, IOType.APPEND);
+ .withLogMarkerFile("000", "partA", f0, IOType.APPEND, 1);
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 896310f114d..4f5aa34e8c3 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -418,14 +418,16 @@ public class FileCreateUtils {
public static String createLogFileMarker(String basePath, String
partitionPath, String instantTime, String fileId, IOType ioType)
throws IOException {
- return createLogFileMarker(basePath, partitionPath, instantTime, fileId,
ioType, HoodieLogFile.LOGFILE_BASE_VERSION);
+ return createLogFileMarker(basePath, partitionPath, instantTime,
instantTime, fileId, ioType, HoodieLogFile.LOGFILE_BASE_VERSION);
}
- public static String createLogFileMarker(String basePath, String
partitionPath, String instantTime, String fileId, IOType ioType, int logVersion)
+ public static String createLogFileMarker(String basePath, String
partitionPath, String baseInstantTime, String instantTime, String fileId,
+ IOType ioType,
+ int logVersion)
throws IOException {
Path parentPath = Paths.get(basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
Files.createDirectories(parentPath);
- Path markerFilePath =
parentPath.resolve(logFileMarkerFileName(instantTime, fileId, ioType,
logVersion));
+ Path markerFilePath =
parentPath.resolve(logFileMarkerFileName(baseInstantTime, fileId, ioType,
logVersion));
if (Files.notExists(markerFilePath)) {
Files.createFile(markerFilePath);
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 32ca659918a..45f6ebaba14 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -614,8 +614,9 @@ public class HoodieTestTable {
return this;
}
- public HoodieTestTable withLogMarkerFile(String partitionPath, String
fileId, IOType ioType) throws IOException {
- createLogFileMarker(basePath, partitionPath, currentInstantTime, fileId,
ioType);
+ public HoodieTestTable withLogMarkerFile(String baseInstantTime, String
partitionPath, String fileId, IOType ioType, int logVersion)
+ throws IOException {
+ createLogFileMarker(basePath, partitionPath, baseInstantTime,
currentInstantTime, fileId, ioType, logVersion);
return this;
}
@@ -683,9 +684,32 @@ public class HoodieTestTable {
return Pair.of(this, logFiles);
}
+ /**
+ * Writes log files in the partition.
+ *
+ * @param partition partition to write log files
+ * @param fileInfos list of pairs of file ID, log version, and file size of
the log files
+ * @return {@link HoodieTestTable} instance
+ * @throws Exception upon error
+ */
public HoodieTestTable withLogFilesInPartition(String partition,
List<Pair<String, Integer[]>> fileInfos) throws Exception {
- for (Pair<String, Integer[]> fileInfo : fileInfos) {
- FileCreateUtils.createLogFile(basePath, partition, currentInstantTime,
fileInfo.getKey(), fileInfo.getValue()[0], fileInfo.getValue()[1]);
+ return withLogFilesAndBaseInstantTimeInPartition(partition,
+ fileInfos.stream().map(e -> Pair.of(Pair.of(currentInstantTime,
e.getLeft()), e.getRight())).collect(Collectors.toList()));
+ }
+
+ /**
+ * Writes log files in the partition.
+ *
+ * @param partition partition to write log files
+ * @param fileInfos list of pairs of base instant time, file ID, log
version, and file size of the log files
+ * @return {@link HoodieTestTable} instance
+ * @throws Exception upon error
+ */
+ public HoodieTestTable withLogFilesAndBaseInstantTimeInPartition(String
partition, List<Pair<Pair<String, String>, Integer[]>> fileInfos)
+ throws Exception {
+ for (Pair<Pair<String, String>, Integer[]> fileInfo : fileInfos) {
+ FileCreateUtils.createLogFile(
+ basePath, partition, fileInfo.getKey().getKey(),
fileInfo.getKey().getValue(), fileInfo.getValue()[0], fileInfo.getValue()[1]);
}
return this;
}
@@ -767,11 +791,11 @@ public class HoodieTestTable {
}
public Path getLogFilePath(String partition, String fileId, int version) {
- return new Path(Paths.get(basePath, partition, getLogFileNameById(fileId,
version)).toString());
+ return new Path(Paths.get(basePath, partition,
getLogFileNameById(currentInstantTime, fileId, version)).toString());
}
- public String getLogFileNameById(String fileId, int version) {
- return logFileName(currentInstantTime, fileId, version);
+ public String getLogFileNameById(String baseInstantTime, String fileId, int
version) {
+ return logFileName(baseInstantTime, fileId, version);
}
public List<String> getEarliestFilesInPartition(String partition, int count)
throws IOException {