This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new acb17a1b859 branch-4.0: [Enhancement](iceberg) add iceberg expire
cache params in catalog property #57187 (#59177)
acb17a1b859 is described below
commit acb17a1b859ea89700c4d8de3ea7beb866dbe7a7
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 23 12:54:46 2025 +0800
branch-4.0: [Enhancement](iceberg) add iceberg expire cache params in
catalog property #57187 (#59177)
Cherry-picked from #57187
Co-authored-by: yaoxiao <[email protected]>
Co-authored-by: yaoxiao <[email protected]>
---
.../doris/datasource/ExternalMetaCacheMgr.java | 69 ++++++---
.../doris/datasource/hive/HMSExternalTable.java | 5 +-
.../datasource/iceberg/IcebergExternalCatalog.java | 32 ++++
.../datasource/iceberg/IcebergMetadataCache.java | 43 ++++--
.../iceberg/IcebergMetadataCacheMgr.java | 51 -------
.../doris/datasource/iceberg/IcebergUtils.java | 10 +-
.../iceberg/source/IcebergApiSource.java | 7 +-
.../iceberg/source/IcebergHMSSource.java | 5 +-
.../doris/tablefunction/MetadataGenerator.java | 2 +-
.../iceberg/test_iceberg_table_meta_cache.groovy | 168 +++++++++++++++++++++
10 files changed, 293 insertions(+), 99 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 4932f3aa8f9..5b568ff491b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -30,8 +30,8 @@ import
org.apache.doris.datasource.hudi.source.HudiCachedFsViewProcessor;
import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor;
import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
-import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
import org.apache.doris.datasource.metacache.MetaCache;
@@ -89,7 +89,10 @@ public class ExternalMetaCacheMgr {
private ExecutorService scheduleExecutor;
// catalog id -> HiveMetaStoreCache
- private final Map<Long, HiveMetaStoreCache> cacheMap =
Maps.newConcurrentMap();
+ private final Map<Long, HiveMetaStoreCache> hiveCacheMap =
Maps.newConcurrentMap();
+
+ // catalog id -> IcebergMetadataCache
+ private final Map<Long, IcebergMetadataCache> icebergCacheMap =
Maps.newConcurrentMap();
// catalog id -> table schema cache
private final Map<Long, ExternalSchemaCache> schemaCacheMap =
Maps.newHashMap();
// hudi partition manager
@@ -98,7 +101,6 @@ public class ExternalMetaCacheMgr {
private FileSystemCache fsCache;
// all external table row count cache.
private ExternalRowCountCache rowCountCache;
- private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
private final DorisExternalMetaCacheMgr dorisExternalMetaCacheMgr;
@@ -130,7 +132,6 @@ public class ExternalMetaCacheMgr {
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);
hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
- icebergMetadataCacheMgr = new
IcebergMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
paimonMetadataCacheMgr = new
PaimonMetadataCacheMgr(commonRefreshExecutor);
dorisExternalMetaCacheMgr = new
DorisExternalMetaCacheMgr(commonRefreshExecutor);
@@ -160,14 +161,14 @@ public class ExternalMetaCacheMgr {
}
public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
- HiveMetaStoreCache cache = cacheMap.get(catalog.getId());
+ HiveMetaStoreCache cache = hiveCacheMap.get(catalog.getId());
if (cache == null) {
- synchronized (cacheMap) {
- if (!cacheMap.containsKey(catalog.getId())) {
- cacheMap.put(catalog.getId(),
+ synchronized (hiveCacheMap) {
+ if (!hiveCacheMap.containsKey(catalog.getId())) {
+ hiveCacheMap.put(catalog.getId(),
new HiveMetaStoreCache(catalog,
commonRefreshExecutor, fileListingExecutor));
}
- cache = cacheMap.get(catalog.getId());
+ cache = hiveCacheMap.get(catalog.getId());
}
}
return cache;
@@ -202,8 +203,17 @@ public class ExternalMetaCacheMgr {
return hudiMetadataCacheMgr;
}
- public IcebergMetadataCache getIcebergMetadataCache() {
- return icebergMetadataCacheMgr.getIcebergMetadataCache();
+ public IcebergMetadataCache getIcebergMetadataCache(IcebergExternalCatalog
catalog) {
+ IcebergMetadataCache cache = icebergCacheMap.get(catalog.getId());
+ if (cache == null) {
+ synchronized (icebergCacheMap) {
+ if (!icebergCacheMap.containsKey(catalog.getId())) {
+ icebergCacheMap.put(catalog.getId(), new
IcebergMetadataCache(catalog, commonRefreshExecutor));
+ }
+ cache = icebergCacheMap.get(catalog.getId());
+ }
+ }
+ return cache;
}
public PaimonMetadataCache getPaimonMetadataCache() {
@@ -227,7 +237,7 @@ public class ExternalMetaCacheMgr {
}
public void removeCache(long catalogId) {
- if (cacheMap.remove(catalogId) != null) {
+ if (hiveCacheMap.remove(catalogId) != null) {
LOG.info("remove hive metastore cache for catalog {}", catalogId);
}
synchronized (schemaCacheMap) {
@@ -235,8 +245,10 @@ public class ExternalMetaCacheMgr {
LOG.info("remove schema cache for catalog {}", catalogId);
}
}
+ if (icebergCacheMap.remove(catalogId) != null) {
+ LOG.info("remove iceberg meta cache for catalog {}", catalogId);
+ }
hudiMetadataCacheMgr.removeCache(catalogId);
- icebergMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
paimonMetadataCacheMgr.removeCache(catalogId);
dorisExternalMetaCacheMgr.removeCache(catalogId);
@@ -249,12 +261,15 @@ public class ExternalMetaCacheMgr {
schemaCache.invalidateTableCache(dorisTable);
}
}
- HiveMetaStoreCache metaCache =
cacheMap.get(dorisTable.getCatalog().getId());
- if (metaCache != null) {
- metaCache.invalidateTableCache(dorisTable.getOrBuildNameMapping());
+ HiveMetaStoreCache hiveMetaCache =
hiveCacheMap.get(dorisTable.getCatalog().getId());
+ if (hiveMetaCache != null) {
+
hiveMetaCache.invalidateTableCache(dorisTable.getOrBuildNameMapping());
+ }
+ IcebergMetadataCache icebergMetadataCache =
icebergCacheMap.get(dorisTable.getCatalog().getId());
+ if (icebergMetadataCache != null) {
+ icebergMetadataCache.invalidateTableCache(dorisTable);
}
hudiMetadataCacheMgr.invalidateTableCache(dorisTable);
- icebergMetadataCacheMgr.invalidateTableCache(dorisTable);
maxComputeMetadataCacheMgr.invalidateTableCache(dorisTable);
paimonMetadataCacheMgr.invalidateTableCache(dorisTable);
if (LOG.isDebugEnabled()) {
@@ -271,12 +286,15 @@ public class ExternalMetaCacheMgr {
schemaCache.invalidateDbCache(dbName);
}
}
- HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
if (metaCache != null) {
metaCache.invalidateDbCache(dbName);
}
+ IcebergMetadataCache icebergMetadataCache =
icebergCacheMap.get(catalogId);
+ if (icebergMetadataCache != null) {
+ icebergMetadataCache.invalidateDbCache(catalogId, dbName);
+ }
hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
- icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
if (LOG.isDebugEnabled()) {
@@ -288,12 +306,15 @@ public class ExternalMetaCacheMgr {
synchronized (schemaCacheMap) {
schemaCacheMap.remove(catalogId);
}
- HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
if (metaCache != null) {
metaCache.invalidateAll();
}
+ IcebergMetadataCache icebergMetadataCache =
icebergCacheMap.get(catalogId);
+ if (icebergMetadataCache != null) {
+ icebergMetadataCache.invalidateCatalogCache(catalogId);
+ }
hudiMetadataCacheMgr.invalidateCatalogCache(catalogId);
- icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
dorisExternalMetaCacheMgr.invalidateCatalogCache(catalogId);
@@ -310,7 +331,7 @@ public class ExternalMetaCacheMgr {
public void addPartitionsCache(long catalogId, HMSExternalTable table,
List<String> partitionNames) {
String dbName =
ClusterNamespace.getNameFromFullName(table.getDbName());
- HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
if (metaCache != null) {
List<Type> partitionColumnTypes;
try {
@@ -328,7 +349,7 @@ public class ExternalMetaCacheMgr {
public void dropPartitionsCache(long catalogId, HMSExternalTable table,
List<String> partitionNames) {
String dbName =
ClusterNamespace.getNameFromFullName(table.getDbName());
- HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
if (metaCache != null) {
metaCache.dropPartitionsCache(table, partitionNames, true);
}
@@ -338,7 +359,7 @@ public class ExternalMetaCacheMgr {
}
public void invalidatePartitionsCache(ExternalTable dorisTable,
List<String> partitionNames) {
- HiveMetaStoreCache metaCache =
cacheMap.get(dorisTable.getCatalog().getId());
+ HiveMetaStoreCache metaCache =
hiveCacheMap.get(dorisTable.getCatalog().getId());
if (metaCache != null) {
for (String partitionName : partitionNames) {
metaCache.invalidatePartitionCache(dorisTable, partitionName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 09c9526f08d..b7e0acb5c41 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -40,6 +40,7 @@ import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergMvccSnapshot;
import org.apache.doris.datasource.iceberg.IcebergSchemaCacheKey;
import org.apache.doris.datasource.iceberg.IcebergUtils;
@@ -813,7 +814,9 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
case ICEBERG:
if (GlobalVariable.enableFetchIcebergStats) {
return StatisticsUtil.getIcebergColumnStats(colName,
-
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this));
+ Env.getCurrentEnv().getExtMetaCacheMgr()
+
.getIcebergMetadataCache((IcebergExternalCatalog) this.getCatalog())
+ .getIcebergTable(this));
} else {
break;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 90d7ab41ebe..a8e05f1c2ed 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -32,9 +32,12 @@ import
org.apache.doris.datasource.operations.ExternalMetadataOperations;
import
org.apache.doris.datasource.property.metastore.AbstractIcebergProperties;
import org.apache.doris.transaction.TransactionManagerFactory;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.iceberg.catalog.Catalog;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
public abstract class IcebergExternalCatalog extends ExternalCatalog {
@@ -46,6 +49,8 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String ICEBERG_DLF = "dlf";
public static final String ICEBERG_S3_TABLES = "s3tables";
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
+ public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND =
"iceberg.table.meta.cache.ttl-second";
+ public static final String ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND =
"iceberg.snapshot.meta.cache.ttl-second";
protected String icebergCatalogType;
protected Catalog catalog;
@@ -73,9 +78,36 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
@Override
public void checkProperties() throws DdlException {
super.checkProperties();
+ // check iceberg.table.meta.cache.ttl-second parameter
+ String tableMetaCacheTtlSecond =
catalogProperty.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null);
+ if (Objects.nonNull(tableMetaCacheTtlSecond) &&
NumberUtils.toInt(tableMetaCacheTtlSecond, CACHE_NO_TTL)
+ < CACHE_TTL_DISABLE_CACHE) {
+ throw new DdlException(
+ "The parameter " + ICEBERG_TABLE_META_CACHE_TTL_SECOND + "
is wrong, value is "
+ + tableMetaCacheTtlSecond);
+ }
+
+ // check iceberg.snapshot.meta.cache.ttl-second parameter
+ String partitionCacheTtlSecond =
catalogProperty.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null);
+ if (Objects.nonNull(partitionCacheTtlSecond) &&
NumberUtils.toInt(partitionCacheTtlSecond, CACHE_NO_TTL)
+ < CACHE_TTL_DISABLE_CACHE) {
+ throw new DdlException(
+ "The parameter " + ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND
+ " is wrong, value is "
+ + partitionCacheTtlSecond);
+ }
catalogProperty.checkMetaStoreAndStorageProperties(AbstractIcebergProperties.class);
}
+ @Override
+ public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
+ super.notifyPropertiesUpdated(updatedProps);
+ String tableMetaCacheTtl =
updatedProps.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null);
+ String snapshotMetaCacheTtl =
updatedProps.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null);
+ if (Objects.nonNull(tableMetaCacheTtl) ||
Objects.nonNull(snapshotMetaCacheTtl)) {
+
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
+ }
+ }
+
@Override
protected synchronized void initPreExecutionAuthenticator() {
if (executionAuthenticator == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index 8ae6ce092c0..0d49965d44a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -51,37 +52,55 @@ import java.util.concurrent.ExecutorService;
public class IcebergMetadataCache {
private static final Logger LOG =
LogManager.getLogger(IcebergMetadataCache.class);
+ private final ExecutorService executor;
+ private final IcebergExternalCatalog catalog;
+ private LoadingCache<IcebergMetadataCacheKey, List<Snapshot>>
snapshotListCache;
+ private LoadingCache<IcebergMetadataCacheKey, Table> tableCache;
+ private LoadingCache<IcebergMetadataCacheKey, IcebergSnapshotCacheValue>
snapshotCache;
+ private LoadingCache<IcebergMetadataCacheKey, View> viewCache;
+
+ public IcebergMetadataCache(IcebergExternalCatalog catalog,
ExecutorService executor) {
+ this.executor = executor;
+ this.catalog = catalog;
+ init();
+ }
+
+ public void init() {
+ long tableMetaCacheTtlSecond = NumberUtils.toLong(
+
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_TABLE_META_CACHE_TTL_SECOND),
+ ExternalCatalog.CACHE_NO_TTL);
- private final LoadingCache<IcebergMetadataCacheKey, List<Snapshot>>
snapshotListCache;
- private final LoadingCache<IcebergMetadataCacheKey, Table> tableCache;
- private final LoadingCache<IcebergMetadataCacheKey,
IcebergSnapshotCacheValue> snapshotCache;
- private final LoadingCache<IcebergMetadataCacheKey, View> viewCache;
+ long snapshotMetaCacheTtlSecond = NumberUtils.toLong(
+
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND),
+ ExternalCatalog.CACHE_NO_TTL);
- public IcebergMetadataCache(ExecutorService executor) {
CacheFactory snapshotListCacheFactory = new CacheFactory(
-
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+ OptionalLong.of(snapshotMetaCacheTtlSecond >=
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
+ ? snapshotMetaCacheTtlSecond :
Config.external_cache_expire_time_seconds_after_access),
OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
Config.max_external_table_cache_num,
true,
null);
- this.snapshotListCache = snapshotListCacheFactory.buildCache(key ->
loadSnapshots(key), executor);
+ this.snapshotListCache =
snapshotListCacheFactory.buildCache(this::loadSnapshots, executor);
CacheFactory tableCacheFactory = new CacheFactory(
-
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+ OptionalLong.of(tableMetaCacheTtlSecond >=
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
+ ? tableMetaCacheTtlSecond :
Config.external_cache_expire_time_seconds_after_access),
OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
Config.max_external_table_cache_num,
true,
null);
- this.tableCache = tableCacheFactory.buildCache(key -> loadTable(key),
executor);
+ this.tableCache = tableCacheFactory.buildCache(this::loadTable,
executor);
CacheFactory snapshotCacheFactory = new CacheFactory(
-
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+ OptionalLong.of(snapshotMetaCacheTtlSecond >=
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
+ ? snapshotMetaCacheTtlSecond :
Config.external_cache_expire_time_seconds_after_access),
OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
Config.max_external_table_cache_num,
true,
null);
- this.snapshotCache = snapshotCacheFactory.buildCache(key ->
loadSnapshot(key), executor);
- this.viewCache = tableCacheFactory.buildCache(key -> loadView(key),
executor);
+ this.snapshotCache =
snapshotCacheFactory.buildCache(this::loadSnapshot, executor);
+ this.viewCache = tableCacheFactory.buildCache(this::loadView,
executor);
}
public Table getIcebergTable(ExternalTable dorisTable) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java
deleted file mode 100644
index 6e72dcf2fac..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java
+++ /dev/null
@@ -1,51 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.iceberg;
-
-import org.apache.doris.datasource.ExternalTable;
-
-import java.util.concurrent.ExecutorService;
-
-public class IcebergMetadataCacheMgr {
-
- private IcebergMetadataCache icebergMetadataCache;
-
- public IcebergMetadataCacheMgr(ExecutorService executor) {
- this.icebergMetadataCache = new IcebergMetadataCache(executor);
- }
-
- public IcebergMetadataCache getIcebergMetadataCache() {
- return icebergMetadataCache;
- }
-
- public void removeCache(long catalogId) {
- icebergMetadataCache.invalidateCatalogCache(catalogId);
- }
-
- public void invalidateCatalogCache(long catalogId) {
- icebergMetadataCache.invalidateCatalogCache(catalogId);
- }
-
- public void invalidateTableCache(ExternalTable dorisTable) {
- icebergMetadataCache.invalidateTableCache(dorisTable);
- }
-
- public void invalidateDbCache(long catalogId, String dbName) {
- icebergMetadataCache.invalidateDbCache(catalogId, dbName);
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index b38477d98d3..cdd6995c3a5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -735,7 +735,7 @@ public class IcebergUtils {
public static Table getIcebergTable(ExternalTable dorisTable) {
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
- .getIcebergMetadataCache().getIcebergTable(dorisTable);
+ .getIcebergMetadataCache((IcebergExternalCatalog)
dorisTable.getCatalog()).getIcebergTable(dorisTable);
}
public static org.apache.iceberg.types.Type dorisTypeToIcebergType(Type
type) {
@@ -989,7 +989,7 @@ public class IcebergUtils {
// Meanwhile, it will trigger iceberg metadata cache to load the
table, so we can get it next time.
Table icebergTable = Env.getCurrentEnv()
.getExtMetaCacheMgr()
- .getIcebergMetadataCache()
+ .getIcebergMetadataCache((IcebergExternalCatalog)
tbl.getCatalog())
.getIcebergTable(tbl);
Snapshot snapshot = icebergTable.currentSnapshot();
if (snapshot == null) {
@@ -1496,7 +1496,8 @@ public class IcebergUtils {
new IcebergSnapshot(info.getSnapshotId(),
info.getSchemaId()));
} else {
// Otherwise, use the latest snapshot and the latest schema.
- return
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
+ return Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getIcebergMetadataCache((IcebergExternalCatalog)
dorisTable.getCatalog())
.getSnapshotCache(dorisTable);
}
}
@@ -1520,7 +1521,8 @@ public class IcebergUtils {
}
public static View getIcebergView(ExternalTable dorisTable) {
- IcebergMetadataCache metadataCache =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache();
+ IcebergMetadataCache metadataCache =
Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getIcebergMetadataCache((IcebergExternalCatalog)
dorisTable.getCatalog());
return metadataCache.getIcebergView(dorisTable);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
index 78f7ece1a7a..6d0999ff44c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.ColumnRange;
@@ -54,10 +55,8 @@ public class IcebergApiSource implements IcebergSource {
throw new UnsupportedOperationException("IcebergApiSource does not
support view");
}
this.icebergExtTable = table;
-
- this.originTable =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
- icebergExtTable);
-
+ this.originTable = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getIcebergMetadataCache((IcebergExternalCatalog)
table.getCatalog()).getIcebergTable(icebergExtTable);
this.desc = desc;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
index 2e68c775a3e..8d18d3e211e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergUtils;
public class IcebergHMSSource implements IcebergSource {
@@ -36,8 +37,8 @@ public class IcebergHMSSource implements IcebergSource {
this.hmsTable = hmsTable;
this.desc = desc;
this.icebergTable =
-
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
- .getIcebergTable(hmsTable);
+ Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getIcebergMetadataCache((IcebergExternalCatalog)
hmsTable.getCatalog()).getIcebergTable(hmsTable);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index fb4f5d6ad17..461f00304f7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -1555,7 +1555,7 @@ public class MetadataGenerator {
fillBatch(dataBatch,
hudiMetadataCacheMgr.getCacheStats(catalog), catalog.getName());
} else if (catalogIf instanceof IcebergExternalCatalog) {
// 3. iceberg cache
- IcebergMetadataCache icebergCache =
mgr.getIcebergMetadataCache();
+ IcebergMetadataCache icebergCache =
mgr.getIcebergMetadataCache((IcebergExternalCatalog) catalogIf);
fillBatch(dataBatch, icebergCache.getCacheStats(),
catalogIf.getName());
}
}
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
new file mode 100644
index 00000000000..fa0142089a1
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_table_meta_cache",
"p0,external,doris,external_docker,external_docker_doris") {
+ String catalog_name = "test_iceberg_meta_cache"
+ String catalog_name_no_cache = "test_iceberg_meta_no_cache"
+
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ for (String hivePrefix : ["hive2"]) {
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+ String hmsPort = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
+ String hdfs_port = context.config.otherConfigs.get(hivePrefix +
"HdfsPort")
+ String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}"
+ String warehouse = "${default_fs}/warehouse"
+
+ // 1. test default catalog
+ sql """drop catalog if exists ${catalog_name};"""
+ sql """
+ create catalog ${catalog_name} properties (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hms',
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
+ 'fs.defaultFS' = '${default_fs}',
+ 'warehouse' = '${warehouse}'
+ );
+ """
+ sql """switch ${catalog_name}"""
+ sql """drop database if exists test_iceberg_meta_cache_db"""
+ sql """create database test_iceberg_meta_cache_db"""
+ sql """
+ CREATE TABLE test_iceberg_meta_cache_db.sales (
+ id INT,
+ amount DOUBLE
+ );
+ """
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
2.0)"""
+ // select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(2,
2.0)"""
+ // still select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
3.0)"""
+ // still select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ sql """refresh table test_iceberg_meta_cache_db.sales"""
+ // select 3 rows
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ sql """drop table test_iceberg_meta_cache_db.sales"""
+
+ // 2. test catalog with iceberg.table.meta.cache.ttl-second
+ sql """drop catalog if exists ${catalog_name_no_cache};"""
+ test {
+ sql """
+ create catalog ${catalog_name_no_cache} properties (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hms',
+ 'hive.metastore.uris' =
'thrift://${externalEnvIp}:${hmsPort}',
+ 'fs.defaultFS' = '${default_fs}',
+ 'warehouse' = '${warehouse}',
+ 'iceberg.table.meta.cache.ttl-second' = '-2'
+ );
+ """
+ exception "is wrong"
+ }
+
+ // disable iceberg table meta cache
+ sql """
+ create catalog ${catalog_name_no_cache} properties (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hms',
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
+ 'fs.defaultFS' = '${default_fs}',
+ 'warehouse' = '${warehouse}',
+ 'iceberg.table.meta.cache.ttl-second' = '0'
+ );
+ """
+ sql """switch ${catalog_name_no_cache}"""
+ sql """drop database if exists test_iceberg_meta_cache_db"""
+ sql """create database test_iceberg_meta_cache_db"""
+ sql """
+ CREATE TABLE test_iceberg_meta_cache_db.sales (
+ id INT,
+ amount DOUBLE
+ );
+ """
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
2.0)"""
+ // select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(2,
2.0)"""
+ // select 2 rows
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
3.0)"""
+ // select 3 rows
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ sql """drop table test_iceberg_meta_cache_db.sales"""
+
+ // test modify ttl property
+ sql """drop catalog if exists ${catalog_name_no_cache};"""
+ // 1. create catalog with default property fisrt
+ sql """
+ create catalog ${catalog_name_no_cache} properties (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hms',
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
+ 'fs.defaultFS' = '${default_fs}',
+ 'warehouse' = '${warehouse}'
+ );
+ """
+ sql """switch ${catalog_name_no_cache}"""
+ sql """drop database if exists test_iceberg_meta_cache_db"""
+ sql """create database test_iceberg_meta_cache_db"""
+ sql """
+ CREATE TABLE test_iceberg_meta_cache_db.sales (
+ id INT,
+ amount DOUBLE
+ );
+ """
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
2.0)"""
+ // select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(2,
2.0)"""
+ // still select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // alter wrong catalog property
+ test {
+ sql """alter catalog ${catalog_name_no_cache} set properties
("iceberg.table.meta.cache.ttl-second" = "-2")"""
+ exception "is wrong"
+ }
+ // alter catalog property, disable meta cache
+ sql """alter catalog ${catalog_name_no_cache} set properties
("iceberg.table.meta.cache.ttl-second" = "0")"""
+ // select 2 rows
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(3,
2.0)"""
+ // select 3 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
3.0)"""
+ // select 4 rows
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ sql """drop table test_iceberg_meta_cache_db.sales"""
+ sql """drop database if exists test_iceberg_meta_cache_db"""
+ sql """drop catalog if exists ${catalog_name_no_cache};"""
+ }
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]