This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fc7f0da95 [core] Coalesce concurrent CachingCatalog table loads 
(#7840)
8fc7f0da95 is described below

commit 8fc7f0da959a51b4a742dc032e46e7eb0b8de8de
Author: Asish Kumar <[email protected]>
AuthorDate: Wed May 13 18:55:45 2026 +0530

    [core] Coalesce concurrent CachingCatalog table loads (#7840)
---
 .../org/apache/paimon/catalog/CachingCatalog.java  | 40 +++++++++++++++------
 .../apache/paimon/catalog/CachingCatalogTest.java  | 41 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 11 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index aad5d16a11..c779db7d20 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -226,11 +226,6 @@ public class CachingCatalog extends DelegateCatalog {
 
     @Override
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
-        Table table = tableCache.getIfPresent(identifier);
-        if (table != null) {
-            return table;
-        }
-
         // For system table, do not cache it directly. Instead, cache the 
origin table and then wrap
         // it to generate the system table.
         if (identifier.isSystemTable()) {
@@ -241,7 +236,7 @@ public class CachingCatalog extends DelegateCatalog {
                             identifier.getBranchName(),
                             null);
             Table originTable = getTable(originIdentifier);
-            table =
+            Table table =
                     SystemTableLoader.load(
                             checkNotNull(identifier.getSystemTableName()),
                             (FileStoreTable) originTable);
@@ -251,12 +246,21 @@ public class CachingCatalog extends DelegateCatalog {
             return table;
         }
 
-        table = wrapped.getTable(identifier);
-        putTableCache(identifier, table);
-        return table;
+        try {
+            return tableCache.get(identifier, this::loadTable);
+        } catch (TableLoadingException e) {
+            throw e.tableNotExistException();
+        }
     }
 
-    private void putTableCache(Identifier identifier, Table table) {
+    private Table loadTable(Identifier identifier) {
+        Table table;
+        try {
+            table = wrapped.getTable(identifier);
+        } catch (TableNotExistException e) {
+            throw new TableLoadingException(e);
+        }
+
         if (table instanceof FileStoreTable) {
             FileStoreTable storeTable = (FileStoreTable) table;
             storeTable.setSnapshotCache(
@@ -283,7 +287,21 @@ public class CachingCatalog extends DelegateCatalog {
             }
         }
 
-        tableCache.put(identifier, table);
+        return table;
+    }
+
+    private static class TableLoadingException extends RuntimeException {
+
+        private final TableNotExistException tableNotExistException;
+
+        private TableLoadingException(TableNotExistException cause) {
+            super(cause);
+            this.tableNotExistException = cause;
+        }
+
+        private TableNotExistException tableNotExistException() {
+            return tableNotExistException;
+        }
     }
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index 153f2af81d..53928aef66 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -56,8 +56,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -175,6 +177,45 @@ class CachingCatalogTest extends CatalogTestBase {
                 .hasMessage("Table db.tbl$branch_b1 does not exist.");
     }
 
+    @Test
+    public void testConcurrentGetTableLoadsTableOnce() throws Exception {
+        Catalog wrapped = Mockito.mock(Catalog.class);
+        CachingCatalog catalog = new CachingCatalog(wrapped, new Options());
+        Identifier tableIdent = new Identifier("db", "tbl");
+        Table table = Mockito.mock(Table.class);
+        AtomicInteger loadCount = new AtomicInteger();
+        CountDownLatch loadStarted = new CountDownLatch(1);
+        CountDownLatch releaseLoad = new CountDownLatch(1);
+        when(wrapped.getTable(tableIdent))
+                .thenAnswer(
+                        invocation -> {
+                            loadCount.incrementAndGet();
+                            loadStarted.countDown();
+                            assertThat(releaseLoad.await(10, 
TimeUnit.SECONDS)).isTrue();
+                            return table;
+                        });
+
+        int numThreads = 8;
+        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+        try {
+            List<Future<Table>> futures = new ArrayList<>();
+            for (int i = 0; i < numThreads; i++) {
+                futures.add(executor.submit(() -> 
catalog.getTable(tableIdent)));
+            }
+
+            assertThat(loadStarted.await(10, TimeUnit.SECONDS)).isTrue();
+            releaseLoad.countDown();
+
+            for (Future<Table> future : futures) {
+                assertThat(future.get(10, TimeUnit.SECONDS)).isSameAs(table);
+            }
+            assertThat(loadCount).hasValue(1);
+        } finally {
+            releaseLoad.countDown();
+            executor.shutdownNow();
+        }
+    }
+
     @Test
     public void testTableExpiresAfterInterval() throws Exception {
         TestableCachingCatalog catalog =

Reply via email to