wwj6591812 commented on code in PR #4427:
URL: https://github.com/apache/paimon/pull/4427#discussion_r1826918980
##########
paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java:
##########
@@ -61,26 +63,31 @@ public class CachingCatalog extends DelegateCatalog {
protected final Cache<String, Map<String, String>> databaseCache;
protected final Cache<Identifier, Table> tableCache;
+ protected final Cache<Identifier, List<PartitionEntry>> partitionCache;
@Nullable protected final SegmentsCache<Path> manifestCache;
+ private final long partitionMaxNum;
Review Comment:
I think cachePartitionMaxNum may better?
##########
paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java:
##########
@@ -227,6 +244,47 @@ private void putTableCache(Identifier identifier, Table
table) {
tableCache.put(identifier, table);
}
+ public List<PartitionEntry> getPartitions(Identifier identifier) throws
TableNotExistException {
+ Table table = this.getTable(identifier);
+ if (enablePartitionCache(table)) {
+ List<PartitionEntry> partitions;
+ partitions = partitionCache.getIfPresent(identifier);
+ if (partitions == null || partitions.isEmpty()) {
+ partitions = this.refreshPartitions(identifier);
+ }
+ return partitions;
+ }
+ return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
+ }
+
+ public List<PartitionEntry> refreshPartitions(Identifier identifier)
Review Comment:
private
##########
paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java:
##########
@@ -209,17 +216,38 @@ public void testCacheExpirationEagerlyRemovesSysTables()
throws Exception {
// Move time forward so the data table drops.
ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
Arrays.stream(sysTables(tableIdent))
.forEach(
sysTable ->
- assertThat(catalog.cache().asMap())
+ assertThat(catalog.tableCache().asMap())
.as(
"When a data table expires,
its sys tables should expire regardless of age")
.doesNotContainKeys(sysTable));
}
+ @Test
+ public void testPartitionCache() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ Schema schema =
+ new Schema(
+ RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
+ singletonList("f0"),
+ emptyList(),
+ Collections.emptyMap(),
+ "");
+ catalog.createTable(tableIdent, schema, false);
+ List<PartitionEntry> partitionEntryList =
catalog.getPartitions(tableIdent);
+ assertThat(catalog.partitionCache().asMap().containsKey(tableIdent));
+ List<PartitionEntry> partitionEntryListFromCache =
+ catalog.partitionCache().getIfPresent(tableIdent);
+
assertThat(partitionEntryListFromCache.containsAll(partitionEntryList));
Review Comment:
partitionEntryListFromCache is nullable
##########
paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java:
##########
@@ -227,6 +244,47 @@ private void putTableCache(Identifier identifier, Table
table) {
tableCache.put(identifier, table);
}
+ public List<PartitionEntry> getPartitions(Identifier identifier) throws
TableNotExistException {
+ Table table = this.getTable(identifier);
+ if (enablePartitionCache(table)) {
+ List<PartitionEntry> partitions;
+ partitions = partitionCache.getIfPresent(identifier);
+ if (partitions == null || partitions.isEmpty()) {
+ partitions = this.refreshPartitions(identifier);
+ }
+ return partitions;
+ }
+ return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
+ }
+
+ public List<PartitionEntry> refreshPartitions(Identifier identifier)
+ throws TableNotExistException {
+ Table table = this.getTable(identifier);
+ List<PartitionEntry> partitions =
+ ((FileStoreTable)
table).newSnapshotReader().partitionEntries();
+ if (enablePartitionCache(table)
+ &&
partitionCache.asMap().values().stream().mapToInt(List::size).sum()
+ < this.partitionMaxNum) {
+ partitionCache.put(identifier, partitions);
+ }
+ return partitions;
+ }
+
+ private boolean enablePartitionCache(Table table) {
Review Comment:
partitionCacheEnabled
##########
paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java:
##########
@@ -227,6 +244,47 @@ private void putTableCache(Identifier identifier, Table
table) {
tableCache.put(identifier, table);
}
+ public List<PartitionEntry> getPartitions(Identifier identifier) throws
TableNotExistException {
+ Table table = this.getTable(identifier);
+ if (enablePartitionCache(table)) {
+ List<PartitionEntry> partitions;
+ partitions = partitionCache.getIfPresent(identifier);
+ if (partitions == null || partitions.isEmpty()) {
+ partitions = this.refreshPartitions(identifier);
+ }
+ return partitions;
+ }
+ return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
+ }
+
+ public List<PartitionEntry> refreshPartitions(Identifier identifier)
+ throws TableNotExistException {
+ Table table = this.getTable(identifier);
+ List<PartitionEntry> partitions =
+ ((FileStoreTable)
table).newSnapshotReader().partitionEntries();
+ if (enablePartitionCache(table)
Review Comment:
I think enablePartitionCache(table) must be true here. Because if it is not
true, then refreshPartitions will not be called.
##########
paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java:
##########
@@ -227,6 +244,47 @@ private void putTableCache(Identifier identifier, Table
table) {
tableCache.put(identifier, table);
}
+ public List<PartitionEntry> getPartitions(Identifier identifier) throws
TableNotExistException {
+ Table table = this.getTable(identifier);
+ if (enablePartitionCache(table)) {
+ List<PartitionEntry> partitions;
+ partitions = partitionCache.getIfPresent(identifier);
+ if (partitions == null || partitions.isEmpty()) {
+ partitions = this.refreshPartitions(identifier);
+ }
+ return partitions;
+ }
+ return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
+ }
+
+ public List<PartitionEntry> refreshPartitions(Identifier identifier)
+ throws TableNotExistException {
+ Table table = this.getTable(identifier);
+ List<PartitionEntry> partitions =
+ ((FileStoreTable)
table).newSnapshotReader().partitionEntries();
+ if (enablePartitionCache(table)
+ &&
partitionCache.asMap().values().stream().mapToInt(List::size).sum()
+ < this.partitionMaxNum) {
+ partitionCache.put(identifier, partitions);
Review Comment:
After this put, if the total cached partitions in partitionCache is larger
than partitionMaxNum. How do you handle this situation?
##########
paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java:
##########
@@ -209,17 +216,38 @@ public void testCacheExpirationEagerlyRemovesSysTables()
throws Exception {
// Move time forward so the data table drops.
ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
Arrays.stream(sysTables(tableIdent))
.forEach(
sysTable ->
- assertThat(catalog.cache().asMap())
+ assertThat(catalog.tableCache().asMap())
.as(
"When a data table expires,
its sys tables should expire regardless of age")
.doesNotContainKeys(sysTable));
}
+ @Test
+ public void testPartitionCache() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ Schema schema =
+ new Schema(
+ RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
+ singletonList("f0"),
+ emptyList(),
+ Collections.emptyMap(),
+ "");
+ catalog.createTable(tableIdent, schema, false);
+ List<PartitionEntry> partitionEntryList =
catalog.getPartitions(tableIdent);
+ assertThat(catalog.partitionCache().asMap().containsKey(tableIdent));
+ List<PartitionEntry> partitionEntryListFromCache =
+ catalog.partitionCache().getIfPresent(tableIdent);
+
assertThat(partitionEntryListFromCache.containsAll(partitionEntryList));
Review Comment:
Assertions.assertTrue
##########
paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java:
##########
@@ -98,6 +98,13 @@ public class CatalogOptions {
.withDescription(
"Controls the duration for which databases and
tables in the catalog are cached.");
+ public static final ConfigOption<Long> CACHE_PARTITION_MAX_NUM =
+ key("cache.partition.max-num")
+ .longType()
+ .defaultValue(0L)
+ .withDescription(
+ "Controls the duration for which databases and
tables in the catalog are cached.");
Review Comment:
duration?
I think "Controls the max number of cached partitions in the catalog" may
better?
##########
paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java:
##########
@@ -227,6 +244,47 @@ private void putTableCache(Identifier identifier, Table
table) {
tableCache.put(identifier, table);
}
+ public List<PartitionEntry> getPartitions(Identifier identifier) throws
TableNotExistException {
+ Table table = this.getTable(identifier);
+ if (enablePartitionCache(table)) {
+ List<PartitionEntry> partitions;
+ partitions = partitionCache.getIfPresent(identifier);
+ if (partitions == null || partitions.isEmpty()) {
+ partitions = this.refreshPartitions(identifier);
+ }
+ return partitions;
+ }
+ return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
+ }
+
+ public List<PartitionEntry> refreshPartitions(Identifier identifier)
+ throws TableNotExistException {
+ Table table = this.getTable(identifier);
+ List<PartitionEntry> partitions =
+ ((FileStoreTable)
table).newSnapshotReader().partitionEntries();
+ if (enablePartitionCache(table)
+ &&
partitionCache.asMap().values().stream().mapToInt(List::size).sum()
+ < this.partitionMaxNum) {
+ partitionCache.put(identifier, partitions);
+ }
+ return partitions;
+ }
+
+ private boolean enablePartitionCache(Table table) {
+ return partitionCache != null
+ && table instanceof FileStoreTable
+ && !table.partitionKeys().isEmpty();
+ }
+
+ @Override
+ public void dropPartition(Identifier identifier, Map<String, String>
partitions)
+ throws TableNotExistException, PartitionNotExistException {
+ wrapped.dropPartition(identifier, partitions);
+ if (partitionCache != null) {
+ partitionCache.invalidate(identifier);
Review Comment:
Why invalidate all partitions of this identifier in the cache?
##########
paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java:
##########
@@ -209,17 +216,38 @@ public void testCacheExpirationEagerlyRemovesSysTables()
throws Exception {
// Move time forward so the data table drops.
ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
Arrays.stream(sysTables(tableIdent))
.forEach(
sysTable ->
- assertThat(catalog.cache().asMap())
+ assertThat(catalog.tableCache().asMap())
.as(
"When a data table expires,
its sys tables should expire regardless of age")
.doesNotContainKeys(sysTable));
}
+ @Test
+ public void testPartitionCache() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ Schema schema =
+ new Schema(
+ RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
+ singletonList("f0"),
+ emptyList(),
+ Collections.emptyMap(),
+ "");
+ catalog.createTable(tableIdent, schema, false);
+ List<PartitionEntry> partitionEntryList =
catalog.getPartitions(tableIdent);
+ assertThat(catalog.partitionCache().asMap().containsKey(tableIdent));
Review Comment:
Assertions.assertTrue
##########
paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java:
##########
@@ -227,6 +244,47 @@ private void putTableCache(Identifier identifier, Table
table) {
tableCache.put(identifier, table);
}
+ public List<PartitionEntry> getPartitions(Identifier identifier) throws
TableNotExistException {
+ Table table = this.getTable(identifier);
+ if (enablePartitionCache(table)) {
+ List<PartitionEntry> partitions;
+ partitions = partitionCache.getIfPresent(identifier);
+ if (partitions == null || partitions.isEmpty()) {
+ partitions = this.refreshPartitions(identifier);
+ }
+ return partitions;
+ }
+ return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
+ }
+
+ public List<PartitionEntry> refreshPartitions(Identifier identifier)
+ throws TableNotExistException {
+ Table table = this.getTable(identifier);
+ List<PartitionEntry> partitions =
+ ((FileStoreTable)
table).newSnapshotReader().partitionEntries();
+ if (enablePartitionCache(table)
Review Comment:
I think enablePartitionCache(table) must be true here. Because if it is not
true, then refreshPartitions will not be called.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]