This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 06a365e4768 [HUDI-8123] Fix MDT file listing to exclude non-existent 
log files in marker-based rollback (#11830)
06a365e4768 is described below

commit 06a365e47684235ed800ef27a750b8f1609c1c59
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat Aug 31 22:54:04 2024 -0700

    [HUDI-8123] Fix MDT file listing to exclude non-existent log files in 
marker-based rollback (#11830)
---
 .../rollback/MarkerBasedRollbackStrategy.java      |  30 ++-
 .../TestHoodieSparkMergeOnReadTableCompaction.java | 234 +++++++++++++++++++--
 .../TestMarkerBasedRollbackStrategy.java           |  67 ++++--
 .../hudi/common/testutils/FileCreateUtils.java     |   8 +-
 .../hudi/common/testutils/HoodieTestTable.java     |  38 +++-
 5 files changed, 337 insertions(+), 40 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 9eede1ca4fc..2b8299b2852 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -32,6 +32,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
 import org.apache.hudi.table.marker.WriteMarkers;
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -151,9 +153,31 @@ public class MarkerBasedRollbackStrategy<T, I, K, O> 
implements BaseRollbackPlan
       HoodieLogFile logFileToRollback = new HoodieLogFile(fullLogFilePath);
       fileId = logFileToRollback.getFileId();
       baseCommitTime = logFileToRollback.getBaseCommitTime();
-      // NOTE: We don't strictly need the exact size, but this size needs to 
be positive to pass metadata payload validation.
-      //       Therefore, we simply stub this value (1L), instead of doing a 
fs call to get the exact size.
-      logBlocksToBeDeleted = 
Collections.singletonMap(logFileToRollback.getPath().getName(), 1L);
+      try {
+        StoragePathInfo pathInfo = 
table.getMetaClient().getStorage().getPathInfo(logFileToRollback.getPath());
+        if (pathInfo != null) {
+          if (baseCommitTime.equals(instantToRollback.getTimestamp())) {
+            // delete the log file that creates a new file group
+            return new HoodieRollbackRequest(relativePartitionPath, 
EMPTY_STRING, EMPTY_STRING,
+                
Collections.singletonList(logFileToRollback.getPath().toString()),
+                Collections.emptyMap());
+          }
+          // append a rollback block to the log block that is added to an 
existing file group
+          logBlocksToBeDeleted = Collections.singletonMap(
+              logFileToRollback.getPath().getName(), pathInfo.getLength());
+        } else {
+          LOG.debug(
+              "File info of {} is null indicating the file does not exist;"
+                  + " there is no need to include it in the rollback.",
+              fullLogFilePath);
+        }
+      } catch (FileNotFoundException e) {
+        LOG.debug(
+            "Log file {} is not found so there is no need to include it in the 
rollback.",
+            fullLogFilePath);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to get the file status of " + 
fullLogFilePath, e);
+      }
     }
     return new HoodieRollbackRequest(relativePartitionPath, fileId, 
baseCommitTime, Collections.emptyList(), logBlocksToBeDeleted);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index 4cc2e4edfd4..85a9c6759c2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -19,49 +19,68 @@
 
 package org.apache.hudi.table.functional;
 
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieLayoutConfig;
+import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
 import org.apache.hudi.table.storage.HoodieStorageLayout;
 import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Tag("functional")
 public class TestHoodieSparkMergeOnReadTableCompaction extends 
SparkClientFunctionalTestHarness {
@@ -124,24 +143,24 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
     client = getHoodieWriteClient(config);
 
     // write data and commit
-    writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true);
+    writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true, false);
     // write data again, and in the case of bucket index, all records will go 
into log files (we use a small max_file_size)
-    writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true);
-    Assertions.assertEquals(200, readTableTotalRecordsNum());
+    writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true, false);
+    assertEquals(200, readTableTotalRecordsNum());
     // schedule compaction
     String compactionTime = (String) 
client.scheduleCompaction(Option.empty()).get();
     // write data, and do not commit. those records should not visible to 
reader
     String insertTime = HoodieActiveTimeline.createNewInstantTime();
-    List<WriteStatus> writeStatuses = writeData(insertTime, 100, false);
-    Assertions.assertEquals(200, readTableTotalRecordsNum());
+    List<WriteStatus> writeStatuses = writeData(insertTime, 100, false, false);
+    assertEquals(200, readTableTotalRecordsNum());
     // commit the write. The records should be visible now even though the 
compaction does not complete.
     client.commitStats(insertTime, context().parallelize(writeStatuses, 1), 
writeStatuses.stream().map(WriteStatus::getStat)
         .collect(Collectors.toList()), Option.empty(), 
metaClient.getCommitActionType());
-    Assertions.assertEquals(300, readTableTotalRecordsNum());
+    assertEquals(300, readTableTotalRecordsNum());
     // after the compaction, total records should remain the same
     config.setValue(AUTO_COMMIT_ENABLE, "true");
     client.compact(compactionTime);
-    Assertions.assertEquals(300, readTableTotalRecordsNum());
+    assertEquals(300, readTableTotalRecordsNum());
   }
 
   @ParameterizedTest
@@ -183,7 +202,184 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
     client.delete(deleteRecords, client.startCommit());
     // insert the same 100 records again
     client.upsert(writeRecords, client.startCommit());
-    Assertions.assertEquals(100, readTableTotalRecordsNum());
+    assertEquals(100, readTableTotalRecordsNum());
+  }
+
+  @ParameterizedTest
+  @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
+  void testCompactionSchedulingWithUncommittedLogFileOrRollback(boolean 
enableMetadataTable,
+                                                                boolean 
runRollback) throws IOException {
+    Properties props = getPropertiesForKeyGen(true);
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .forTable("test-trip-table")
+        .withPath(basePath())
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+        .withAutoCommit(false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            .parquetMaxFileSize(20480).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withHeartbeatIntervalInMs(120000)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+        .build();
+    props.putAll(config.getProps());
+
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
+    client = getHoodieWriteClient(config);
+
+    // instant 1: write inserts and commit, generating base files
+    String instant1 = HoodieActiveTimeline.createNewInstantTime();
+    writeData(instant1, 100, true, false);
+    assertEquals(100, readTableTotalRecordsNum());
+    validateFileListingInMetadataTable();
+    // instant 2: write updates in log files and simulate failed deltacommit
+    String instant2 = HoodieActiveTimeline.createNewInstantTime();
+    List<WriteStatus> writeStatuses2 = writeData(instant2, 100, false, true);
+
+    // remove half of the log files written to simulate the failure case
+    // where the marker is created but the log file is not written
+    List<StoragePathInfo> files = hoodieStorage().listFiles(new 
StoragePath(basePath()));
+    int numTotalLogFiles = 0;
+    for (StoragePathInfo file : files) {
+      if (file.isFile() && !file.getPath().toString().contains(METAFOLDER_NAME)
+          && FSUtils.isLogFile(file.getPath())) {
+        numTotalLogFiles++;
+        if (numTotalLogFiles % 2 == 0) {
+          hoodieStorage().deleteFile(file.getPath());
+        }
+      }
+    }
+
+    int numLogFilesAfterDeletion = 0;
+    files = hoodieStorage().listFiles(new StoragePath(basePath()));
+    for (StoragePathInfo file : files) {
+      if (file.isFile() && !file.getPath().toString().contains(METAFOLDER_NAME)
+          && FSUtils.isLogFile(file.getPath())) {
+        numLogFilesAfterDeletion++;
+      }
+    }
+
+    // validate the current table state to satisfy what is intended to test
+    if (enableMetadataTable) {
+      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder()
+          
.setConf(storageConf()).setBasePath(HoodieTableMetadata.getMetadataTableBasePath(basePath())).build();
+      assertEquals(instant1, 
metadataMetaClient.getActiveTimeline().lastInstant().get().getTimestamp());
+    }
+    assertTrue(numLogFilesAfterDeletion > 0 && numLogFilesAfterDeletion < 
numTotalLogFiles);
+    assertEquals(instant2, 
metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
+    assertEquals(instant1, 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
+    assertEquals(100, readTableTotalRecordsNum());
+
+    // instant 3: write updates in log files and make a successful deltacommit
+    String instant3 = HoodieActiveTimeline.createNewInstantTime();
+    writeData(instant3, 100, true, true);
+
+    if (runRollback) {
+      // If enabled, rollback the failed delta commit
+      client.rollback(instant2);
+      validateLogFilesExistInRollbackPlan();
+      validateFileListingInMetadataTable();
+    }
+
+    // schedule compaction
+    String compactionInstant = (String) 
client.scheduleCompaction(Option.empty()).get();
+    validateFilesExistInCompactionPlan(compactionInstant);
+    if (!runRollback) {
+      // committing instant2 that conflicts with the compaction plan should 
fail
+      assertThrows(HoodieWriteConflictException.class, () -> 
commitToTable(instant2, writeStatuses2));
+    }
+    config.setValue(AUTO_COMMIT_ENABLE, "true");
+    client.compact(compactionInstant);
+    if (runRollback) {
+      validateFileListingInMetadataTable();
+    }
+    assertEquals(100, readTableTotalRecordsNum());
+    assertEquals(
+        compactionInstant,
+        
metaClient.reloadActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
+  }
+
+  private void validateLogFilesExistInRollbackPlan() throws IOException {
+    metaClient.reloadActiveTimeline();
+    HoodieRollbackPlan rollbackPlan = 
RollbackUtils.getRollbackPlan(metaClient, metaClient.getActiveTimeline()
+        .filter(e -> 
HoodieActiveTimeline.ROLLBACK_ACTION.equals(e.getAction()))
+        .lastInstant().get());
+    assertTrue(rollbackPlan.getRollbackRequests().stream()
+        .map(request -> {
+          boolean allExist = true;
+          StoragePath partitionPath = new StoragePath(basePath(), 
request.getPartitionPath());
+          for (String logFile : request.getLogBlocksToBeDeleted().keySet()) {
+            try {
+              allExist = allExist && hoodieStorage().exists(new 
StoragePath(partitionPath, logFile));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+          return allExist;
+        })
+        .reduce(Boolean::logicalAnd)
+        .get());
+  }
+
+  private void validateFilesExistInCompactionPlan(String compactionInstant) {
+    HoodieCompactionPlan compactionPlan = 
CompactionUtils.getCompactionPlan(metaClient, compactionInstant);
+    assertTrue(compactionPlan.getOperations().stream()
+        .map(op -> {
+          boolean allExist;
+          StoragePath partitionPath = new StoragePath(basePath(), 
op.getPartitionPath());
+          try {
+            allExist = hoodieStorage().exists(new StoragePath(partitionPath, 
op.getDataFilePath()));
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+          for (String logFilePath : op.getDeltaFilePaths()) {
+            try {
+              allExist = allExist && hoodieStorage().exists(new 
StoragePath(partitionPath, logFilePath));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+          return allExist;
+        })
+        .reduce(Boolean::logicalAnd)
+        .get());
+  }
+
+  private void validateFileListingInMetadataTable() {
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(context(), 
hoodieStorage(), basePath(), false, false)
+        .stream()
+        .map(e -> new StoragePath(basePath(), e).toString())
+        .collect(Collectors.toList());
+    Map<String, List<StoragePathInfo>> filesFromStorage = 
FSUtils.getFilesInPartitions(
+        context(),
+        hoodieStorage(),
+        HoodieMetadataConfig.newBuilder().enable(false).build(),
+        basePath(),
+        partitionPaths.toArray(new String[0]));
+    Map<String, List<StoragePathInfo>> filesFromMetadataTable = 
FSUtils.getFilesInPartitions(
+        context(),
+        hoodieStorage(),
+        HoodieMetadataConfig.newBuilder().enable(true).build(),
+        basePath(),
+        partitionPaths.toArray(new String[0]));
+    assertEquals(filesFromStorage.size(), filesFromMetadataTable.size());
+    for (String partition : filesFromStorage.keySet()) {
+      List<StoragePathInfo> partitionFilesFromStorage = 
filesFromStorage.get(partition).stream()
+          .sorted().collect(Collectors.toList());
+      List<StoragePathInfo> partitionFilesFromMetadataTable = 
filesFromMetadataTable.get(partition).stream()
+          .sorted().collect(Collectors.toList());
+      assertEquals(partitionFilesFromStorage.size(), 
partitionFilesFromMetadataTable.size());
+      for (int i = 0; i < partitionFilesFromStorage.size(); i++) {
+        assertEquals(
+            partitionFilesFromStorage.get(i).getPath().toString(),
+            partitionFilesFromMetadataTable.get(i).getPath().toString());
+      }
+    }
   }
 
   private long readTableTotalRecordsNum() {
@@ -191,19 +387,29 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
         Arrays.stream(dataGen.getPartitionPaths()).map(p -> 
Paths.get(basePath(), p).toString()).collect(Collectors.toList()), 
basePath()).size();
   }
 
-  private List<WriteStatus> writeData(String instant, int numRecords, boolean 
doCommit) {
+  private List<WriteStatus> writeData(String instant,
+                                      int numRecords,
+                                      boolean doCommit,
+                                      boolean generateUpdates) throws 
IOException {
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    JavaRDD records = jsc().parallelize(dataGen.generateInserts(instant, 
numRecords), 2);
+    List<HoodieRecord> recordList = generateUpdates
+        ? dataGen.generateUpdates(instant, numRecords) : 
dataGen.generateInserts(instant, numRecords);
+    JavaRDD records = jsc().parallelize(recordList, 2);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     client.startCommitWithTime(instant);
     List<WriteStatus> writeStatuses = client.upsert(records, 
instant).collect();
     org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
     if (doCommit) {
-      List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
-      boolean committed = client.commitStats(instant, 
context().parallelize(writeStatuses, 1), writeStats, Option.empty(), 
metaClient.getCommitActionType());
-      Assertions.assertTrue(committed);
+      commitToTable(instant, writeStatuses);
     }
     metaClient = HoodieTableMetaClient.reload(metaClient);
     return writeStatuses;
   }
+
+  private void commitToTable(String instant, List<WriteStatus> writeStatuses) {
+    List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+    boolean committed =
+        client.commitStats(instant, context().parallelize(writeStatuses, 1), 
writeStats, Option.empty(), metaClient.getCommitActionType());
+    assertTrue(committed);
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
index 4612e0eeda6..80b12ad91e2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
@@ -31,7 +31,9 @@ import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
@@ -47,14 +49,17 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -95,7 +100,7 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     String f0 = testTable.addRequestedCommit("000")
         .getFileIdsWithBaseFilesInPartitions("partA").get("partA");
     testTable.forCommit("001")
-        .withLogMarkerFile("partA", f0, IOType.APPEND);
+        .withLogMarkerFile("000", "partA", f0, IOType.APPEND, 1);
 
     HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, 
metaClient);
     List<HoodieRollbackRequest> rollbackRequests = new 
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
@@ -104,26 +109,62 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
   }
 
   @ParameterizedTest
-  @EnumSource(names = {"APPEND"})
-  public void testMarkerBasedRollbackAppendWithLogFileMarkers(IOType 
testIOType) throws Exception {
+  @CsvSource(value = {"APPEND,true,true", "APPEND,true,false", 
"APPEND,false,true", "APPEND,false,false"})
+  public void testMarkerBasedRollbackAppendWithLogFileMarkers(IOType 
testIOType,
+                                                              boolean 
logFileInNewFileGroup,
+                                                              boolean 
logFileExists) throws Exception {
     tearDown();
     tableType = HoodieTableType.MERGE_ON_READ;
     setUp();
+    String partitionPath = "partA";
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
-    String f0 = testTable.addRequestedCommit("000")
-        .getFileIdWithLogFile("partA");
-    testTable.forCommit("001")
-        .withLogMarkerFile("partA", f0, testIOType);
+    String f0 = logFileInNewFileGroup ? UUID.randomUUID().toString()
+        : testTable.addDeltaCommit("000").getFileIdWithLogFile(partitionPath);
+    String logFileName = EMPTY_STRING;
+    int logFileVersion = 1;
+    int logFileSize = 13042;
+    String logFileBaseInstantTime = logFileInNewFileGroup ? "001" : "000";
+    // log file name should still use the base instant time
+    testTable.addInflightDeltaCommit("001")
+        .withLogMarkerFile(logFileBaseInstantTime, partitionPath, f0, 
testIOType, logFileVersion);
+    if (logFileExists) {
+      testTable.withLogFilesAndBaseInstantTimeInPartition(
+          partitionPath, Collections.singletonList(
+              Pair.of(Pair.of(logFileBaseInstantTime, f0), new Integer[] 
{logFileVersion, logFileSize})));
+      logFileName = testTable.getLogFileNameById(logFileBaseInstantTime, f0, 
logFileVersion);
+    }
 
     HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, 
metaClient);
     List<HoodieRollbackRequest> rollbackRequests = new 
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(), "002")
         .getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, "001"));
     assertEquals(1, rollbackRequests.size());
     HoodieRollbackRequest rollbackRequest = rollbackRequests.get(0);
-    assertEquals("partA", rollbackRequest.getPartitionPath());
-    assertEquals(f0, rollbackRequest.getFileId());
-    assertEquals(testIOType.equals(IOType.CREATE) ? 1 : 0, 
rollbackRequest.getFilesToBeDeleted().size());
-    assertEquals(1, rollbackRequest.getLogBlocksToBeDeleted().size());
+    assertEquals(partitionPath, rollbackRequest.getPartitionPath());
+    assertEquals((logFileInNewFileGroup && logFileExists) ? EMPTY_STRING : f0, 
rollbackRequest.getFileId());
+    if (logFileExists) {
+      if (logFileInNewFileGroup) {
+        // log file is written to a new file group in the failed instant; the 
rollback plan
+        // should include it in the "filesToBeDeleted" field so it is going to 
be deleted
+        assertEquals(1, rollbackRequest.getFilesToBeDeleted().size());
+        assertEquals(
+            new StoragePath(new StoragePath(basePath, partitionPath), 
logFileName).toString(),
+            rollbackRequest.getFilesToBeDeleted().get(0));
+        assertEquals(0, rollbackRequest.getLogBlocksToBeDeleted().size());
+      } else {
+        // log file is written to an existing file group in the failed 
instant; the rollback plan
+        // should include it in the "logBlocksToBeDeleted" field, so it is 
kept on storage rolled
+        // back through a command log block
+        assertEquals(0, rollbackRequest.getFilesToBeDeleted().size());
+        assertEquals(1, rollbackRequest.getLogBlocksToBeDeleted().size());
+        
assertTrue(rollbackRequest.getLogBlocksToBeDeleted().containsKey(logFileName));
+        assertEquals(logFileSize, 
rollbackRequest.getLogBlocksToBeDeleted().get(logFileName));
+      }
+    } else {
+      // log file marker exists but the log file is not written to the 
storage, so the rollback
+      // plan should not include it.
+      assertEquals(0, rollbackRequest.getFilesToBeDeleted().size());
+      assertEquals(0, rollbackRequest.getLogBlocksToBeDeleted().size());
+    }
   }
 
   @Test
@@ -295,7 +336,7 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     String f0 = testTable.addRequestedCommit("000")
         .getFileIdsWithBaseFilesInPartitions("partA").get("partA");
     testTable.forCommit("001")
-        .withLogMarkerFile("partA", f0, IOType.APPEND);
+        .withLogMarkerFile("000", "partA", f0, IOType.APPEND, 1);
 
     HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, 
metaClient);
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 896310f114d..4f5aa34e8c3 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -418,14 +418,16 @@ public class FileCreateUtils {
 
   public static String createLogFileMarker(String basePath, String 
partitionPath, String instantTime, String fileId, IOType ioType)
       throws IOException {
-    return createLogFileMarker(basePath, partitionPath, instantTime, fileId, 
ioType, HoodieLogFile.LOGFILE_BASE_VERSION);
+    return createLogFileMarker(basePath, partitionPath, instantTime, 
instantTime, fileId, ioType, HoodieLogFile.LOGFILE_BASE_VERSION);
   }
 
-  public static String createLogFileMarker(String basePath, String 
partitionPath, String instantTime, String fileId, IOType ioType, int logVersion)
+  public static String createLogFileMarker(String basePath, String 
partitionPath, String baseInstantTime, String instantTime, String fileId,
+                                           IOType ioType,
+                                           int logVersion)
       throws IOException {
     Path parentPath = Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
     Files.createDirectories(parentPath);
-    Path markerFilePath = 
parentPath.resolve(logFileMarkerFileName(instantTime, fileId, ioType, 
logVersion));
+    Path markerFilePath = 
parentPath.resolve(logFileMarkerFileName(baseInstantTime, fileId, ioType, 
logVersion));
     if (Files.notExists(markerFilePath)) {
       Files.createFile(markerFilePath);
     }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 32ca659918a..45f6ebaba14 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -614,8 +614,9 @@ public class HoodieTestTable {
     return this;
   }
 
-  public HoodieTestTable withLogMarkerFile(String partitionPath, String 
fileId, IOType ioType) throws IOException {
-    createLogFileMarker(basePath, partitionPath, currentInstantTime, fileId, 
ioType);
+  public HoodieTestTable withLogMarkerFile(String baseInstantTime, String 
partitionPath, String fileId, IOType ioType, int logVersion)
+      throws IOException {
+    createLogFileMarker(basePath, partitionPath, baseInstantTime, 
currentInstantTime, fileId, ioType, logVersion);
     return this;
   }
 
@@ -683,9 +684,32 @@ public class HoodieTestTable {
     return Pair.of(this, logFiles);
   }
 
+  /**
+   * Writes log files in the partition.
+   *
+   * @param partition partition to write log files
+   * @param fileInfos list of pairs of file ID, log version, and file size of 
the log files
+   * @return {@link HoodieTestTable} instance
+   * @throws Exception upon error
+   */
   public HoodieTestTable withLogFilesInPartition(String partition, 
List<Pair<String, Integer[]>> fileInfos) throws Exception {
-    for (Pair<String, Integer[]> fileInfo : fileInfos) {
-      FileCreateUtils.createLogFile(basePath, partition, currentInstantTime, 
fileInfo.getKey(), fileInfo.getValue()[0], fileInfo.getValue()[1]);
+    return withLogFilesAndBaseInstantTimeInPartition(partition,
+        fileInfos.stream().map(e -> Pair.of(Pair.of(currentInstantTime, 
e.getLeft()), e.getRight())).collect(Collectors.toList()));
+  }
+
+  /**
+   * Writes log files in the partition.
+   *
+   * @param partition partition to write log files
+   * @param fileInfos list of pairs of base instant time, file ID, log 
version, and file size of the log files
+   * @return {@link HoodieTestTable} instance
+   * @throws Exception upon error
+   */
+  public HoodieTestTable withLogFilesAndBaseInstantTimeInPartition(String 
partition, List<Pair<Pair<String, String>, Integer[]>> fileInfos)
+      throws Exception {
+    for (Pair<Pair<String, String>, Integer[]> fileInfo : fileInfos) {
+      FileCreateUtils.createLogFile(
+          basePath, partition, fileInfo.getKey().getKey(), 
fileInfo.getKey().getValue(), fileInfo.getValue()[0], fileInfo.getValue()[1]);
     }
     return this;
   }
@@ -767,11 +791,11 @@ public class HoodieTestTable {
   }
 
   public Path getLogFilePath(String partition, String fileId, int version) {
-    return new Path(Paths.get(basePath, partition, getLogFileNameById(fileId, 
version)).toString());
+    return new Path(Paths.get(basePath, partition, 
getLogFileNameById(currentInstantTime, fileId, version)).toString());
   }
 
-  public String getLogFileNameById(String fileId, int version) {
-    return logFileName(currentInstantTime, fileId, version);
+  public String getLogFileNameById(String baseInstantTime, String fileId, int 
version) {
+    return logFileName(baseInstantTime, fileId, version);
   }
 
   public List<String> getEarliestFilesInPartition(String partition, int count) 
throws IOException {

Reply via email to