This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e87a92d603 [HUDI-3453] Fix HoodieBackedTableMetadata concurrent
reading issue (#5091)
e87a92d603 is described below
commit e87a92d603e2476a2c952c1bf2b178b2885907ab
Author: YueZhang <[email protected]>
AuthorDate: Sat Sep 10 02:51:44 2022 +0800
[HUDI-3453] Fix HoodieBackedTableMetadata concurrent reading issue (#5091)
Co-authored-by: yuezhang <[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
---
.../functional/TestHoodieBackedTableMetadata.java | 59 ++++++++++++++++++++++
.../hudi/metadata/HoodieBackedTableMetadata.java | 16 ++++--
2 files changed, 70 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index f029bbe33b..08e95bb57b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -66,10 +66,19 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -94,6 +103,56 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
verifyBaseMetadataTable(reuseReaders);
}
+ /**
+ * Create a cow table and call getAllFilesInPartition api in parallel which
reads data files from MDT
+ * This UT is guard that multi readers for MDT#getAllFilesInPartition api is
safety.
+ * @param reuse
+ * @throws Exception
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse)
throws Exception {
+ final int taskNumber = 20;
+ HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+ init(tableType);
+ testTable.doWriteOperation("000001", INSERT, emptyList(), asList("p1"), 1);
+ HoodieBackedTableMetadata tableMetadata = new
HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(),
writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), reuse);
+ assertTrue(tableMetadata.enabled());
+ List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
+ String partition = metadataPartitions.get(0);
+ String finalPartition = basePath + "/" + partition;
+ ExecutorService executors = Executors.newFixedThreadPool(taskNumber);
+ AtomicBoolean flag = new AtomicBoolean(false);
+ CountDownLatch downLatch = new CountDownLatch(taskNumber);
+ AtomicInteger filesNumber = new AtomicInteger(0);
+
+ // call getAllFilesInPartition api from meta data table in parallel
+ for (int i = 0; i < taskNumber; i++) {
+ executors.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ downLatch.countDown();
+ downLatch.await();
+ FileStatus[] files = tableMetadata.getAllFilesInPartition(new
Path(finalPartition));
+ if (files.length != 1) {
+ LOG.warn("Miss match data file numbers.");
+ throw new RuntimeException("Miss match data file numbers.");
+ }
+ filesNumber.addAndGet(files.length);
+ } catch (Exception e) {
+ LOG.warn("Catch Exception while reading data files from MDT.", e);
+ flag.compareAndSet(false, true);
+ }
+ }
+ });
+ }
+ executors.shutdown();
+ executors.awaitTermination(5, TimeUnit.MINUTES);
+ assertFalse(flag.get());
+ assertEquals(filesNumber.get(), taskNumber);
+ }
+
private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws
Exception {
doWriteInsertAndUpsert(testTable, "0000001", "0000002", false);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index d4865875b1..cb9fb8da14 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -18,9 +18,6 @@
package org.apache.hudi.metadata;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -54,6 +51,10 @@ import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -231,7 +232,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
throw new HoodieIOException("Error merging records from metadata table
for " + sortedKeys.size() + " key : ", ioe);
} finally {
if (!reuse) {
- close(Pair.of(partitionFileSlicePair.getLeft(),
partitionFileSlicePair.getRight().getFileId()));
+ closeReader(readers);
}
}
});
@@ -397,7 +398,12 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
* @return File reader and the record scanner pair for the requested file
slice
*/
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>
getOrCreateReaders(String partitionName, FileSlice slice) {
- return partitionReaders.computeIfAbsent(Pair.of(partitionName,
slice.getFileId()), k -> openReaders(partitionName, slice));
+ if (reuse) {
+ return partitionReaders.computeIfAbsent(Pair.of(partitionName,
slice.getFileId()), k -> {
+ return openReaders(partitionName, slice); });
+ } else {
+ return openReaders(partitionName, slice);
+ }
}
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>
openReaders(String partitionName, FileSlice slice) {