This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8fc7f0da95 [core] Coalesce concurrent CachingCatalog table loads
(#7840)
8fc7f0da95 is described below
commit 8fc7f0da959a51b4a742dc032e46e7eb0b8de8de
Author: Asish Kumar <[email protected]>
AuthorDate: Wed May 13 18:55:45 2026 +0530
[core] Coalesce concurrent CachingCatalog table loads (#7840)
---
.../org/apache/paimon/catalog/CachingCatalog.java | 40 +++++++++++++++------
.../apache/paimon/catalog/CachingCatalogTest.java | 41 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 11 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index aad5d16a11..c779db7d20 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -226,11 +226,6 @@ public class CachingCatalog extends DelegateCatalog {
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
- Table table = tableCache.getIfPresent(identifier);
- if (table != null) {
- return table;
- }
-
// For system table, do not cache it directly. Instead, cache the
origin table and then wrap
// it to generate the system table.
if (identifier.isSystemTable()) {
@@ -241,7 +236,7 @@ public class CachingCatalog extends DelegateCatalog {
identifier.getBranchName(),
null);
Table originTable = getTable(originIdentifier);
- table =
+ Table table =
SystemTableLoader.load(
checkNotNull(identifier.getSystemTableName()),
(FileStoreTable) originTable);
@@ -251,12 +246,21 @@ public class CachingCatalog extends DelegateCatalog {
return table;
}
- table = wrapped.getTable(identifier);
- putTableCache(identifier, table);
- return table;
+ try {
+ return tableCache.get(identifier, this::loadTable);
+ } catch (TableLoadingException e) {
+ throw e.tableNotExistException();
+ }
}
- private void putTableCache(Identifier identifier, Table table) {
+ private Table loadTable(Identifier identifier) {
+ Table table;
+ try {
+ table = wrapped.getTable(identifier);
+ } catch (TableNotExistException e) {
+ throw new TableLoadingException(e);
+ }
+
if (table instanceof FileStoreTable) {
FileStoreTable storeTable = (FileStoreTable) table;
storeTable.setSnapshotCache(
@@ -283,7 +287,21 @@ public class CachingCatalog extends DelegateCatalog {
}
}
- tableCache.put(identifier, table);
+ return table;
+ }
+
+ private static class TableLoadingException extends RuntimeException {
+
+ private final TableNotExistException tableNotExistException;
+
+ private TableLoadingException(TableNotExistException cause) {
+ super(cause);
+ this.tableNotExistException = cause;
+ }
+
+ private TableNotExistException tableNotExistException() {
+ return tableNotExistException;
+ }
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index 153f2af81d..53928aef66 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -56,8 +56,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -175,6 +177,45 @@ class CachingCatalogTest extends CatalogTestBase {
.hasMessage("Table db.tbl$branch_b1 does not exist.");
}
+ @Test
+ public void testConcurrentGetTableLoadsTableOnce() throws Exception {
+ Catalog wrapped = Mockito.mock(Catalog.class);
+ CachingCatalog catalog = new CachingCatalog(wrapped, new Options());
+ Identifier tableIdent = new Identifier("db", "tbl");
+ Table table = Mockito.mock(Table.class);
+ AtomicInteger loadCount = new AtomicInteger();
+ CountDownLatch loadStarted = new CountDownLatch(1);
+ CountDownLatch releaseLoad = new CountDownLatch(1);
+ when(wrapped.getTable(tableIdent))
+ .thenAnswer(
+ invocation -> {
+ loadCount.incrementAndGet();
+ loadStarted.countDown();
+ assertThat(releaseLoad.await(10,
TimeUnit.SECONDS)).isTrue();
+ return table;
+ });
+
+ int numThreads = 8;
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ try {
+ List<Future<Table>> futures = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(executor.submit(() ->
catalog.getTable(tableIdent)));
+ }
+
+ assertThat(loadStarted.await(10, TimeUnit.SECONDS)).isTrue();
+ releaseLoad.countDown();
+
+ for (Future<Table> future : futures) {
+ assertThat(future.get(10, TimeUnit.SECONDS)).isSameAs(table);
+ }
+ assertThat(loadCount).hasValue(1);
+ } finally {
+ releaseLoad.countDown();
+ executor.shutdownNow();
+ }
+ }
+
@Test
public void testTableExpiresAfterInterval() throws Exception {
TestableCachingCatalog catalog =