This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a8bde0e9 [core] Allow cache and refresh partitions for CachingCatalog 
(#4427)
5a8bde0e9 is described below

commit 5a8bde0e9c0f2f41864fa8687b55e2d72d57b0ee
Author: Jiao Mingye <[email protected]>
AuthorDate: Tue Nov 5 13:31:27 2024 +0800

    [core] Allow cache and refresh partitions for CachingCatalog (#4427)
---
 .../generated/catalog_configuration.html           |  6 ++
 .../org/apache/paimon/options/CatalogOptions.java  |  7 +++
 .../org/apache/paimon/catalog/CachingCatalog.java  | 72 +++++++++++++++++++++-
 .../apache/paimon/catalog/CachingCatalogTest.java  | 62 ++++++++++++++-----
 .../paimon/catalog/TestableCachingCatalog.java     | 14 +++--
 5 files changed, 138 insertions(+), 23 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index b9d74c812..94f11c6c4 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -44,6 +44,12 @@ under the License.
             <td>Duration</td>
             <td>Controls the duration for which databases and tables in the 
catalog are cached.</td>
         </tr>
+        <tr>
+            <td><h5>cache.partition.max-num</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Long</td>
+            <td>Controls the max number for which partitions in the catalog 
are cached.</td>
+        </tr>
         <tr>
             <td><h5>cache.manifest.max-memory</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 081668675..0d8a9290a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -98,6 +98,13 @@ public class CatalogOptions {
                     .withDescription(
                             "Controls the duration for which databases and 
tables in the catalog are cached.");
 
+    public static final ConfigOption<Long> CACHE_PARTITION_MAX_NUM =
+            key("cache.partition.max-num")
+                    .longType()
+                    .defaultValue(0L)
+                    .withDescription(
+                            "Controls the max number for which partitions in 
the catalog are cached.");
+
     public static final ConfigOption<MemorySize> 
CACHE_MANIFEST_SMALL_FILE_MEMORY =
             key("cache.manifest.small-file-memory")
                     .memoryType()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 077775945..444a828af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaChange;
@@ -52,6 +53,7 @@ import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
+import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
 import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
 
 /** A {@link Catalog} to cache databases and tables and manifests. */
@@ -61,26 +63,31 @@ public class CachingCatalog extends DelegateCatalog {
 
     protected final Cache<String, Map<String, String>> databaseCache;
     protected final Cache<Identifier, Table> tableCache;
+    @Nullable protected final Cache<Identifier, List<PartitionEntry>> 
partitionCache;
     @Nullable protected final SegmentsCache<Path> manifestCache;
+    private final long cachedPartitionMaxNum;
 
     public CachingCatalog(Catalog wrapped) {
         this(
                 wrapped,
                 CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
                 CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
-                CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
+                CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
+                CACHE_PARTITION_MAX_NUM.defaultValue());
     }
 
     public CachingCatalog(
             Catalog wrapped,
             Duration expirationInterval,
             MemorySize manifestMaxMemory,
-            long manifestCacheThreshold) {
+            long manifestCacheThreshold,
+            long cachedPartitionMaxNum) {
         this(
                 wrapped,
                 expirationInterval,
                 manifestMaxMemory,
                 manifestCacheThreshold,
+                cachedPartitionMaxNum,
                 Ticker.systemTicker());
     }
 
@@ -89,6 +96,7 @@ public class CachingCatalog extends DelegateCatalog {
             Duration expirationInterval,
             MemorySize manifestMaxMemory,
             long manifestCacheThreshold,
+            long cachedPartitionMaxNum,
             Ticker ticker) {
         super(wrapped);
         if (expirationInterval.isZero() || expirationInterval.isNegative()) {
@@ -111,7 +119,19 @@ public class CachingCatalog extends DelegateCatalog {
                         .expireAfterAccess(expirationInterval)
                         .ticker(ticker)
                         .build();
+        this.partitionCache =
+                cachedPartitionMaxNum == 0
+                        ? null
+                        : Caffeine.newBuilder()
+                                .softValues()
+                                .executor(Runnable::run)
+                                .expireAfterAccess(expirationInterval)
+                                .weigher(this::weigh)
+                                .maximumWeight(cachedPartitionMaxNum)
+                                .ticker(ticker)
+                                .build();
         this.manifestCache = SegmentsCache.create(manifestMaxMemory, 
manifestCacheThreshold);
+        this.cachedPartitionMaxNum = cachedPartitionMaxNum;
     }
 
     public static Catalog tryToCreate(Catalog catalog, Options options) {
@@ -131,7 +151,8 @@ public class CachingCatalog extends DelegateCatalog {
                 catalog,
                 options.get(CACHE_EXPIRATION_INTERVAL_MS),
                 manifestMaxMemory,
-                manifestThreshold);
+                manifestThreshold,
+                options.get(CACHE_PARTITION_MAX_NUM));
     }
 
     @Override
@@ -227,6 +248,51 @@ public class CachingCatalog extends DelegateCatalog {
         tableCache.put(identifier, table);
     }
 
+    public List<PartitionEntry> getPartitions(Identifier identifier) throws 
TableNotExistException {
+        Table table = this.getTable(identifier);
+        if (partitionCacheEnabled(table)) {
+            List<PartitionEntry> partitions;
+            partitions = partitionCache.getIfPresent(identifier);
+            if (partitions == null || partitions.isEmpty()) {
+                partitions = this.refreshPartitions(identifier);
+            }
+            return partitions;
+        }
+        return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
+    }
+
+    public List<PartitionEntry> refreshPartitions(Identifier identifier)
+            throws TableNotExistException {
+        Table table = this.getTable(identifier);
+        List<PartitionEntry> partitions =
+                ((FileStoreTable) 
table).newSnapshotReader().partitionEntries();
+        if (partitionCacheEnabled(table)
+                && 
partitionCache.asMap().values().stream().mapToInt(List::size).sum()
+                        < this.cachedPartitionMaxNum) {
+            partitionCache.put(identifier, partitions);
+        }
+        return partitions;
+    }
+
+    private boolean partitionCacheEnabled(Table table) {
+        return partitionCache != null
+                && table instanceof FileStoreTable
+                && !table.partitionKeys().isEmpty();
+    }
+
+    private int weigh(Identifier identifier, List<PartitionEntry> partitions) {
+        return partitions.size();
+    }
+
+    @Override
+    public void dropPartition(Identifier identifier, Map<String, String> 
partitions)
+            throws TableNotExistException, PartitionNotExistException {
+        wrapped.dropPartition(identifier, partitions);
+        if (partitionCache != null) {
+            partitionCache.invalidate(identifier);
+        }
+    }
+
     private class TableInvalidatingRemovalListener implements 
RemovalListener<Identifier, Table> {
         @Override
         public void onRemoval(Identifier identifier, Table table, @NonNull 
RemovalCause cause) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index d1f7eeb8a..d645c46bf 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -20,8 +20,10 @@ package org.apache.paimon.catalog;
 
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
@@ -32,6 +34,8 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.FakeTicker;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -44,6 +48,7 @@ import java.io.FileNotFoundException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
@@ -51,6 +56,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.apache.paimon.data.BinaryString.fromString;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
@@ -113,15 +120,15 @@ class CachingCatalogTest extends CatalogTestBase {
         Table table = catalog.getTable(tableIdent);
 
         // Ensure table is cached with full ttl remaining upon creation
-        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
         
assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(EXPIRATION_TTL);
 
         ticker.advance(HALF_OF_EXPIRATION);
-        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
         
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
 
         ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(10)));
-        assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+        assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
         assertThat(catalog.getTable(tableIdent))
                 .as("CachingCatalog should return a new instance after 
expiration")
                 .isNotSameAs(table);
@@ -135,11 +142,11 @@ class CachingCatalogTest extends CatalogTestBase {
         Identifier tableIdent = new Identifier("db", "tbl");
         catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
         catalog.getTable(tableIdent);
-        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
         
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(Duration.ZERO);
 
         ticker.advance(HALF_OF_EXPIRATION);
-        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
         
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
         assertThat(catalog.remainingAgeFor(tableIdent))
                 .isPresent()
@@ -148,7 +155,7 @@ class CachingCatalogTest extends CatalogTestBase {
 
         Duration oneMinute = Duration.ofMinutes(1L);
         ticker.advance(oneMinute);
-        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
         assertThat(catalog.ageOf(tableIdent))
                 .isPresent()
                 .get()
@@ -175,17 +182,17 @@ class CachingCatalogTest extends CatalogTestBase {
         Identifier tableIdent = new Identifier("db", "tbl");
         catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
         Table table = catalog.getTable(tableIdent);
-        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
         assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
 
         ticker.advance(HALF_OF_EXPIRATION);
-        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
         
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
 
         for (Identifier sysTable : sysTables(tableIdent)) {
             catalog.getTable(sysTable);
         }
-        
assertThat(catalog.cache().asMap()).containsKeys(sysTables(tableIdent));
+        
assertThat(catalog.tableCache().asMap()).containsKeys(sysTables(tableIdent));
         assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
                 .isNotEmpty()
                 .allMatch(age -> age.isPresent() && 
age.get().equals(Duration.ZERO));
@@ -209,17 +216,39 @@ class CachingCatalogTest extends CatalogTestBase {
 
         // Move time forward so the data table drops.
         ticker.advance(HALF_OF_EXPIRATION);
-        assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+        assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
 
         Arrays.stream(sysTables(tableIdent))
                 .forEach(
                         sysTable ->
-                                assertThat(catalog.cache().asMap())
+                                assertThat(catalog.tableCache().asMap())
                                         .as(
                                                 "When a data table expires, 
its sys tables should expire regardless of age")
                                         .doesNotContainKeys(sysTable));
     }
 
+    @Test
+    public void testPartitionCache() throws Exception {
+        TestableCachingCatalog catalog =
+                new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, 
ticker);
+
+        Identifier tableIdent = new Identifier("db", "tbl");
+        Schema schema =
+                new Schema(
+                        RowType.of(VarCharType.STRING_TYPE, 
VarCharType.STRING_TYPE).getFields(),
+                        singletonList("f0"),
+                        emptyList(),
+                        Collections.emptyMap(),
+                        "");
+        catalog.createTable(tableIdent, schema, false);
+        List<PartitionEntry> partitionEntryList = 
catalog.getPartitions(tableIdent);
+        assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent);
+        List<PartitionEntry> partitionEntryListFromCache =
+                catalog.partitionCache().getIfPresent(tableIdent);
+        assertThat(partitionEntryListFromCache).isNotNull();
+        
assertThat(partitionEntryListFromCache).containsAll(partitionEntryList);
+    }
+
     @Test
     public void testDeadlock() throws Exception {
         Catalog underlyCatalog = this.catalog;
@@ -233,7 +262,7 @@ class CachingCatalogTest extends CatalogTestBase {
             createdTables.add(tableIdent);
         }
 
-        Cache<Identifier, Table> cache = catalog.cache();
+        Cache<Identifier, Table> cache = catalog.tableCache();
         AtomicInteger cacheGetCount = new AtomicInteger(0);
         AtomicInteger cacheCleanupCount = new AtomicInteger(0);
         ExecutorService executor = Executors.newFixedThreadPool(numThreads);
@@ -288,10 +317,10 @@ class CachingCatalogTest extends CatalogTestBase {
         Identifier tableIdent = new Identifier("db", "tbl");
         catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
         catalog.getTable(tableIdent);
-        assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+        assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
         catalog.dropTable(tableIdent, false);
-        assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
-        
assertThat(wrappedCatalog.cache().asMap()).doesNotContainKey(tableIdent);
+        assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
+        
assertThat(wrappedCatalog.tableCache().asMap()).doesNotContainKey(tableIdent);
     }
 
     public static Identifier[] sysTables(Identifier tableIdent) {
@@ -313,7 +342,8 @@ class CachingCatalogTest extends CatalogTestBase {
                         this.catalog,
                         Duration.ofSeconds(10),
                         MemorySize.ofMebiBytes(1),
-                        manifestCacheThreshold);
+                        manifestCacheThreshold,
+                        0L);
         Identifier tableIdent = new Identifier("db", "tbl");
         catalog.dropTable(tableIdent, true);
         catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
index 159f5edae..4c70a0232 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.table.Table;
 
@@ -25,6 +26,7 @@ import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cach
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
 
 import java.time.Duration;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -36,18 +38,22 @@ public class TestableCachingCatalog extends CachingCatalog {
     private final Duration cacheExpirationInterval;
 
     public TestableCachingCatalog(Catalog catalog, Duration 
expirationInterval, Ticker ticker) {
-        super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE, 
ticker);
+        super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE, 
Long.MAX_VALUE, ticker);
         this.cacheExpirationInterval = expirationInterval;
     }
 
-    public Cache<Identifier, Table> cache() {
+    public Cache<Identifier, Table> tableCache() {
         // cleanUp must be called as tests apply assertions directly on the 
underlying map, but
-        // metadata
-        // table map entries are cleaned up asynchronously.
+        // metadata table map entries are cleaned up asynchronously.
         tableCache.cleanUp();
         return tableCache;
     }
 
+    public Cache<Identifier, List<PartitionEntry>> partitionCache() {
+        partitionCache.cleanUp();
+        return partitionCache;
+    }
+
     public Optional<Duration> ageOf(Identifier identifier) {
         return tableCache.policy().expireAfterAccess().get().ageOf(identifier);
     }

Reply via email to