This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e87a92d603 [HUDI-3453] Fix HoodieBackedTableMetadata concurrent 
reading issue (#5091)
e87a92d603 is described below

commit e87a92d603e2476a2c952c1bf2b178b2885907ab
Author: YueZhang <[email protected]>
AuthorDate: Sat Sep 10 02:51:44 2022 +0800

    [HUDI-3453] Fix HoodieBackedTableMetadata concurrent reading issue (#5091)
    
    Co-authored-by: yuezhang <[email protected]>
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../functional/TestHoodieBackedTableMetadata.java  | 59 ++++++++++++++++++++++
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 16 ++++--
 2 files changed, 70 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index f029bbe33b..08e95bb57b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -66,10 +66,19 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -94,6 +103,56 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
     verifyBaseMetadataTable(reuseReaders);
   }
 
+  /**
+   * Create a cow table and call getAllFilesInPartition api in parallel which 
reads data files from MDT
+   * This UT is guard that multi readers for MDT#getAllFilesInPartition api is 
safety.
+   * @param reuse
+   * @throws Exception
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse) 
throws Exception {
+    final int taskNumber = 20;
+    HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+    init(tableType);
+    testTable.doWriteOperation("000001", INSERT, emptyList(), asList("p1"), 1);
+    HoodieBackedTableMetadata tableMetadata = new 
HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), reuse);
+    assertTrue(tableMetadata.enabled());
+    List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
+    String partition = metadataPartitions.get(0);
+    String finalPartition = basePath + "/" + partition;
+    ExecutorService executors = Executors.newFixedThreadPool(taskNumber);
+    AtomicBoolean flag = new AtomicBoolean(false);
+    CountDownLatch downLatch = new CountDownLatch(taskNumber);
+    AtomicInteger filesNumber = new AtomicInteger(0);
+
+    // call getAllFilesInPartition api from meta data table in parallel
+    for (int i = 0; i < taskNumber; i++) {
+      executors.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            downLatch.countDown();
+            downLatch.await();
+            FileStatus[] files = tableMetadata.getAllFilesInPartition(new 
Path(finalPartition));
+            if (files.length != 1) {
+              LOG.warn("Miss match data file numbers.");
+              throw new RuntimeException("Miss match data file numbers.");
+            }
+            filesNumber.addAndGet(files.length);
+          } catch (Exception e) {
+            LOG.warn("Catch Exception while reading data files from MDT.", e);
+            flag.compareAndSet(false, true);
+          }
+        }
+      });
+    }
+    executors.shutdown();
+    executors.awaitTermination(5, TimeUnit.MINUTES);
+    assertFalse(flag.get());
+    assertEquals(filesNumber.get(), taskNumber);
+  }
+
   private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws 
Exception {
     doWriteInsertAndUpsert(testTable, "0000001", "0000002", false);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index d4865875b1..cb9fb8da14 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -18,9 +18,6 @@
 
 package org.apache.hudi.metadata;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -54,6 +51,10 @@ import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -231,7 +232,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         throw new HoodieIOException("Error merging records from metadata table 
for  " + sortedKeys.size() + " key : ", ioe);
       } finally {
         if (!reuse) {
-          close(Pair.of(partitionFileSlicePair.getLeft(), 
partitionFileSlicePair.getRight().getFileId()));
+          closeReader(readers);
         }
       }
     });
@@ -397,7 +398,12 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
    * @return File reader and the record scanner pair for the requested file 
slice
    */
   private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> 
getOrCreateReaders(String partitionName, FileSlice slice) {
-    return partitionReaders.computeIfAbsent(Pair.of(partitionName, 
slice.getFileId()), k -> openReaders(partitionName, slice));
+    if (reuse) {
+      return partitionReaders.computeIfAbsent(Pair.of(partitionName, 
slice.getFileId()), k -> {
+        return openReaders(partitionName, slice); });
+    } else {
+      return openReaders(partitionName, slice);
+    }
   }
 
   private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> 
openReaders(String partitionName, FileSlice slice) {

Reply via email to