This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch release-0.10.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e131507d4672316b6a0f2b2f3f63d8ad34e49b85
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Dec 4 00:44:50 2021 -0500

    [HUDI-2923] Fixing metadata table reader when metadata compaction is 
inflight (#4206)
    
    * [HUDI-2923] Fixing metadata table reader when metadata compaction is 
inflight
    
    * Fixing retry of pending compaction in metadata table and enhancing tests
    
    (cherry picked from commit 1d4fb827e73b2ae510f0e2c6510a448c0b5bd5b3)
---
 .../hudi/client/AbstractHoodieWriteClient.java     |  7 ++
 .../metadata/HoodieBackedTableMetadataWriter.java  |  6 +-
 .../FlinkHoodieBackedTableMetadataWriter.java      |  2 +-
 .../SparkHoodieBackedTableMetadataWriter.java      |  2 +-
 .../functional/TestHoodieBackedMetadata.java       | 86 ++++++++++++++++++++++
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  2 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  8 +-
 .../hudi/common/testutils/FileCreateUtils.java     | 10 +++
 8 files changed, 117 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 59acbb2..5c2bee1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -520,6 +520,13 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
   }
 
   /**
+   * Run any pending compactions.
+   */
+  public void runAnyPendingCompactions() {
+    runAnyPendingCompactions(createTable(config, hadoopConf, 
config.isMetadataTableEnabled()));
+  }
+
+  /**
    * Create a savepoint based on the latest commit action on the timeline.
    *
    * @param user - User creating the savepoint
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 54284fc..eb4b24a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -682,7 +682,10 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
    *      deltacommit.
    */
   protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, 
String instantTime) {
-    String latestDeltacommitTime = 
metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+    // finish off any pending compactions if any from previous attempt.
+    writeClient.runAnyPendingCompactions();
+
+    String latestDeltacommitTime = 
metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
         .get().getTimestamp();
     List<HoodieInstant> pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
         
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
@@ -693,6 +696,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       return;
     }
 
+
     // Trigger compaction with suffixes based on the same instant time. This 
ensures that any future
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 0dcfcfc..d11f570 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -154,7 +154,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
    * The record is tagged with respective file slice's location based on its 
record key.
    */
   private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String 
partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
 partitionName);
+    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
 partitionName, false);
     ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, 
String.format("Invalid number of file groups: found=%d, required=%d", 
fileSlices.size(), numFileGroups));
 
     return records.stream().map(r -> {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 65ade82..b7b5961 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -169,7 +169,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
    * The record is tagged with respective file slice's location based on its 
record key.
    */
   private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, 
String partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
 partitionName);
+    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
 partitionName, false);
     ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, 
String.format("Invalid number of file groups: found=%d, required=%d", 
fileSlices.size(), numFileGroups));
 
     return recordsRDD.map(r -> {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index ed245da..0339c47 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -54,6 +54,7 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.FileCreateUtils;
+import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.HoodieTimer;
@@ -413,6 +414,91 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     });
   }
 
+
+  /**
+   * Tests that virtual key configs are honored in base files after compaction 
in metadata table.
+   *
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testMetadataTableWithPendingCompaction(boolean 
simulateFailedCompaction) throws Exception {
+    HoodieTableType tableType = COPY_ON_WRITE;
+    init(tableType, false);
+    writeConfig = getWriteConfigBuilder(true, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .enableFullScan(true)
+            .enableMetrics(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(3)
+            .build()).build();
+    initWriteConfigAndMetatableWriter(writeConfig, true);
+
+    doWriteOperation(testTable, "0000001", INSERT);
+    // create an inflight compaction in metadata table.
+    // not easy to create an inflight in metadata table directly, hence 
letting compaction succeed and then deleting the completed instant.
+    // this new write is expected to trigger metadata table compaction
+    String commitInstant = "0000002";
+    doWriteOperation(testTable, commitInstant, INSERT);
+
+    HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
+    String metadataCompactionInstant = commitInstant + "001";
+    assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
+    assertEquals(tableMetadata.getLatestCompactionTime().get(), 
metadataCompactionInstant);
+
+    validateMetadata(testTable);
+    // Fetch compaction Commit file and rename to some other file. completed 
compaction meta file should have some serialized info that table interprets
+    // for future upserts. so, renaming the file here to some temp name and 
later renaming it back to same name.
+    java.nio.file.Path parentPath = Paths.get(metadataTableBasePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
+    java.nio.file.Path metaFilePath = 
parentPath.resolve(metadataCompactionInstant + HoodieTimeline.COMMIT_EXTENSION);
+    java.nio.file.Path tempFilePath = 
FileCreateUtils.renameFileToTemp(metaFilePath, metadataCompactionInstant);
+    metaClient.reloadActiveTimeline();
+    testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter);
+    // this validation will exercise the code path where a compaction is 
inflight in metadata table, but still metadata based file listing should match 
non
+    // metadata based file listing.
+    validateMetadata(testTable);
+
+    if (simulateFailedCompaction) {
+      // this should retry the compaction in metadata table.
+      doWriteOperation(testTable, "0000003", INSERT);
+    } else {
+      // let the compaction succeed in metadata and validation should succeed.
+      FileCreateUtils.renameTempToMetaFile(tempFilePath, metaFilePath);
+    }
+
+    validateMetadata(testTable);
+
+    // add few more write and validate
+    doWriteOperation(testTable, "0000004", INSERT);
+    doWriteOperation(testTable, "0000005", UPSERT);
+    validateMetadata(testTable);
+
+    if (simulateFailedCompaction) {
+      //trigger another compaction failure.
+      metadataCompactionInstant = "0000005001";
+      tableMetadata = metadata(writeConfig, context);
+      assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
+      assertEquals(tableMetadata.getLatestCompactionTime().get(), 
metadataCompactionInstant);
+
+      // Fetch compaction Commit file and rename to some other file. completed 
compaction meta file should have some serialized info that table interprets
+      // for future upserts. so, renaming the file here to some temp name and 
later renaming it back to same name.
+      parentPath = Paths.get(metadataTableBasePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
+      metaFilePath = parentPath.resolve(metadataCompactionInstant + 
HoodieTimeline.COMMIT_EXTENSION);
+      tempFilePath = FileCreateUtils.renameFileToTemp(metaFilePath, 
metadataCompactionInstant);
+
+      validateMetadata(testTable);
+
+      // this should retry the failed compaction in metadata table.
+      doWriteOperation(testTable, "0000006", INSERT);
+
+      validateMetadata(testTable);
+
+      // add few more write and validate
+      doWriteOperation(testTable, "0000007", INSERT);
+      doWriteOperation(testTable, "0000008", UPSERT);
+      validateMetadata(testTable);
+    }
+  }
+
   /**
    * Test rollback of various table operations sync to Metadata Table 
correctly.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 8a9f855..58c25a1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -245,7 +245,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
 
         // Metadata is in sync till the latest completed instant on the dataset
         HoodieTimer timer = new HoodieTimer().startTimer();
-        List<FileSlice> latestFileSlices = 
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
 partitionName);
+        List<FileSlice> latestFileSlices = 
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
 partitionName, true);
         if (latestFileSlices.size() == 0) {
           // empty partition
           return Pair.of(null, null);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 9078bd0..b4dfbbd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -50,6 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
 
@@ -338,9 +339,10 @@ public class HoodieTableMetadataUtil {
    * The list of file slices returned is sorted in the correct order of file 
group name.
    * @param metaClient instance of {@link HoodieTableMetaClient}.
    * @param partition The name of the partition whose file groups are to be 
loaded.
+   * @param isReader true if reader code path, false otherwise.
    * @return List of latest file slices for all file groups in a given 
partition.
    */
-  public static List<FileSlice> 
loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, 
String partition) {
+  public static List<FileSlice> 
loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, 
String partition, boolean isReader) {
     LOG.info("Loading file groups for metadata table partition " + partition);
 
     // If there are no commits on the metadata table then the table's default 
FileSystemView will not return any file
@@ -352,7 +354,9 @@ public class HoodieTableMetadataUtil {
     }
 
     HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, timeline);
-    return fsView.getLatestFileSlices(partition).sorted((s1, s2) -> 
s1.getFileId().compareTo(s2.getFileId()))
+    Stream<FileSlice> fileSliceStream = isReader ? 
fsView.getLatestMergedFileSlicesBeforeOrOn(partition, 
timeline.filterCompletedInstants().lastInstant().get().getTimestamp()) :
+        fsView.getLatestFileSlices(partition);
+    return fileSliceStream.sorted((s1, s2) -> 
s1.getFileId().compareTo(s2.getFileId()))
         .collect(Collectors.toList());
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 15215b8..486c473 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -344,6 +344,16 @@ public class FileCreateUtils {
     removeMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION);
   }
 
+  public static java.nio.file.Path renameFileToTemp(java.nio.file.Path 
sourcePath, String instantTime) throws IOException {
+    java.nio.file.Path dummyFilePath = 
sourcePath.getParent().resolve(instantTime + ".temp");
+    Files.move(sourcePath, dummyFilePath);
+    return dummyFilePath;
+  }
+
+  public static void renameTempToMetaFile(java.nio.file.Path tempFilePath, 
java.nio.file.Path destPath) throws IOException {
+    Files.move(tempFilePath, destPath);
+  }
+
   public static long getTotalMarkerFileCount(String basePath, String 
partitionPath, String instantTime, IOType ioType) throws IOException {
     Path parentPath = Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
     if (Files.notExists(parentPath)) {

Reply via email to