This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new aef789fb2a5 [Enhancement](iceberg) add iceberg expire cache params in
catalog property (#57187)
aef789fb2a5 is described below
commit aef789fb2a5bcc95b066b4c518b6dd2150ecc088
Author: yaoxiao <[email protected]>
AuthorDate: Fri Dec 19 10:37:59 2025 +0800
[Enhancement](iceberg) add iceberg expire cache params in catalog property
(#57187)
### What problem does this PR solve?
Issue Number: #57228
Related PR: #xxx
Problem Summary:
there are some problems:
1. when insert new data into iceberg,doris can not retrieve new data
untill the table meta cache is refreshed.
2. when alter iceberg table's schema, doris can not detect schema
changes until both the table meta cache and the snapshot meta cache are
refreshed. if we add a new column and insert data into this column using
Spark, doris can not detect this column and new data until both the
table meta cache and the snapshot meta cache are refreshed.
3. both the table meta cache and the snapshot meta cache are 10 minutes,
it's too long.
we can add two params in catalog properties:
**iceberg.table.meta.cache.ttl-second** and
**iceberg.snapshot.meta.cache.ttl-second**
these params can control the expire of iceberg table meta cache and
snapshot meta cache,
1. Regarding the "iceberg.table.meta.cache.ttl-second" parameter,we can
create a iceberg catalog like this:
CREATE CATALOG iceberg
PROPERTIES (
"type" = "iceberg",
"iceberg.catalog.type" = "hms",
"hive.metastore.uris" = "thrift://localhost:9083",
**"iceberg.table.meta.cache.ttl-second" = "0"**
);
when we set "iceberg.table.meta.cache.ttl-second" = "0", Doris can
retrieve the latest data immediately without using the cache.
Alternatively, we can set the "iceberg.table.meta.cache.ttl-second"
parameter to a smaller value, thus reducing the cache interval.
2.Regarding the "iceberg.snapshot.meta.cache.ttl-second" parameter, we
can create a iceberg catalog like this:
CREATE CATALOG iceberg
PROPERTIES (
"type" = "iceberg",
"iceberg.catalog.type" = "hms",
"hive.metastore.uris" = "thrift://localhost:9083",
**"iceberg.table.meta.cache.ttl-second" = "0",**
**"iceberg.snapshot.meta.cache.ttl-second" = "0"**
);
when we set "iceberg.table.meta.cache.ttl-second" = "0" and
"iceberg.snapshot.meta.cache.ttl-second"="0", doris can immediately
detect schema changes(use desc or select ...), if we add a new column
and insert data into this column using Spark, doris can detect this
column and new data immediately.
Co-authored-by: yaoxiao <[email protected]>
---
.../doris/datasource/ExternalMetaCacheMgr.java | 69 ++++++---
.../doris/datasource/hive/HMSExternalTable.java | 5 +-
.../datasource/iceberg/IcebergExternalCatalog.java | 32 ++++
.../datasource/iceberg/IcebergMetadataCache.java | 43 ++++--
.../iceberg/IcebergMetadataCacheMgr.java | 51 -------
.../doris/datasource/iceberg/IcebergUtils.java | 10 +-
.../iceberg/source/IcebergApiSource.java | 7 +-
.../iceberg/source/IcebergHMSSource.java | 5 +-
.../doris/tablefunction/MetadataGenerator.java | 2 +-
.../iceberg/test_iceberg_table_meta_cache.groovy | 168 +++++++++++++++++++++
10 files changed, 293 insertions(+), 99 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 4932f3aa8f9..5b568ff491b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -30,8 +30,8 @@ import
org.apache.doris.datasource.hudi.source.HudiCachedFsViewProcessor;
import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor;
import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
-import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
import org.apache.doris.datasource.metacache.MetaCache;
@@ -89,7 +89,10 @@ public class ExternalMetaCacheMgr {
private ExecutorService scheduleExecutor;
// catalog id -> HiveMetaStoreCache
- private final Map<Long, HiveMetaStoreCache> cacheMap =
Maps.newConcurrentMap();
+ private final Map<Long, HiveMetaStoreCache> hiveCacheMap =
Maps.newConcurrentMap();
+
+ // catalog id -> IcebergMetadataCache
+ private final Map<Long, IcebergMetadataCache> icebergCacheMap =
Maps.newConcurrentMap();
// catalog id -> table schema cache
private final Map<Long, ExternalSchemaCache> schemaCacheMap =
Maps.newHashMap();
// hudi partition manager
@@ -98,7 +101,6 @@ public class ExternalMetaCacheMgr {
private FileSystemCache fsCache;
// all external table row count cache.
private ExternalRowCountCache rowCountCache;
- private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
private final DorisExternalMetaCacheMgr dorisExternalMetaCacheMgr;
@@ -130,7 +132,6 @@ public class ExternalMetaCacheMgr {
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);
hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
- icebergMetadataCacheMgr = new
IcebergMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
paimonMetadataCacheMgr = new
PaimonMetadataCacheMgr(commonRefreshExecutor);
dorisExternalMetaCacheMgr = new
DorisExternalMetaCacheMgr(commonRefreshExecutor);
@@ -160,14 +161,14 @@ public class ExternalMetaCacheMgr {
}
public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
- HiveMetaStoreCache cache = cacheMap.get(catalog.getId());
+ HiveMetaStoreCache cache = hiveCacheMap.get(catalog.getId());
if (cache == null) {
- synchronized (cacheMap) {
- if (!cacheMap.containsKey(catalog.getId())) {
- cacheMap.put(catalog.getId(),
+ synchronized (hiveCacheMap) {
+ if (!hiveCacheMap.containsKey(catalog.getId())) {
+ hiveCacheMap.put(catalog.getId(),
new HiveMetaStoreCache(catalog,
commonRefreshExecutor, fileListingExecutor));
}
- cache = cacheMap.get(catalog.getId());
+ cache = hiveCacheMap.get(catalog.getId());
}
}
return cache;
@@ -202,8 +203,17 @@ public class ExternalMetaCacheMgr {
return hudiMetadataCacheMgr;
}
- public IcebergMetadataCache getIcebergMetadataCache() {
- return icebergMetadataCacheMgr.getIcebergMetadataCache();
+ public IcebergMetadataCache getIcebergMetadataCache(IcebergExternalCatalog
catalog) {
+ IcebergMetadataCache cache = icebergCacheMap.get(catalog.getId());
+ if (cache == null) {
+ synchronized (icebergCacheMap) {
+ if (!icebergCacheMap.containsKey(catalog.getId())) {
+ icebergCacheMap.put(catalog.getId(), new
IcebergMetadataCache(catalog, commonRefreshExecutor));
+ }
+ cache = icebergCacheMap.get(catalog.getId());
+ }
+ }
+ return cache;
}
public PaimonMetadataCache getPaimonMetadataCache() {
@@ -227,7 +237,7 @@ public class ExternalMetaCacheMgr {
}
public void removeCache(long catalogId) {
- if (cacheMap.remove(catalogId) != null) {
+ if (hiveCacheMap.remove(catalogId) != null) {
LOG.info("remove hive metastore cache for catalog {}", catalogId);
}
synchronized (schemaCacheMap) {
@@ -235,8 +245,10 @@ public class ExternalMetaCacheMgr {
LOG.info("remove schema cache for catalog {}", catalogId);
}
}
+ if (icebergCacheMap.remove(catalogId) != null) {
+ LOG.info("remove iceberg meta cache for catalog {}", catalogId);
+ }
hudiMetadataCacheMgr.removeCache(catalogId);
- icebergMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
paimonMetadataCacheMgr.removeCache(catalogId);
dorisExternalMetaCacheMgr.removeCache(catalogId);
@@ -249,12 +261,15 @@ public class ExternalMetaCacheMgr {
schemaCache.invalidateTableCache(dorisTable);
}
}
- HiveMetaStoreCache metaCache =
cacheMap.get(dorisTable.getCatalog().getId());
- if (metaCache != null) {
- metaCache.invalidateTableCache(dorisTable.getOrBuildNameMapping());
+ HiveMetaStoreCache hiveMetaCache =
hiveCacheMap.get(dorisTable.getCatalog().getId());
+ if (hiveMetaCache != null) {
+
hiveMetaCache.invalidateTableCache(dorisTable.getOrBuildNameMapping());
+ }
+ IcebergMetadataCache icebergMetadataCache =
icebergCacheMap.get(dorisTable.getCatalog().getId());
+ if (icebergMetadataCache != null) {
+ icebergMetadataCache.invalidateTableCache(dorisTable);
}
hudiMetadataCacheMgr.invalidateTableCache(dorisTable);
- icebergMetadataCacheMgr.invalidateTableCache(dorisTable);
maxComputeMetadataCacheMgr.invalidateTableCache(dorisTable);
paimonMetadataCacheMgr.invalidateTableCache(dorisTable);
if (LOG.isDebugEnabled()) {
@@ -271,12 +286,15 @@ public class ExternalMetaCacheMgr {
schemaCache.invalidateDbCache(dbName);
}
}
- HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
if (metaCache != null) {
metaCache.invalidateDbCache(dbName);
}
+ IcebergMetadataCache icebergMetadataCache =
icebergCacheMap.get(catalogId);
+ if (icebergMetadataCache != null) {
+ icebergMetadataCache.invalidateDbCache(catalogId, dbName);
+ }
hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
- icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
if (LOG.isDebugEnabled()) {
@@ -288,12 +306,15 @@ public class ExternalMetaCacheMgr {
synchronized (schemaCacheMap) {
schemaCacheMap.remove(catalogId);
}
- HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
if (metaCache != null) {
metaCache.invalidateAll();
}
+ IcebergMetadataCache icebergMetadataCache =
icebergCacheMap.get(catalogId);
+ if (icebergMetadataCache != null) {
+ icebergMetadataCache.invalidateCatalogCache(catalogId);
+ }
hudiMetadataCacheMgr.invalidateCatalogCache(catalogId);
- icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
dorisExternalMetaCacheMgr.invalidateCatalogCache(catalogId);
@@ -310,7 +331,7 @@ public class ExternalMetaCacheMgr {
public void addPartitionsCache(long catalogId, HMSExternalTable table,
List<String> partitionNames) {
String dbName =
ClusterNamespace.getNameFromFullName(table.getDbName());
- HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
if (metaCache != null) {
List<Type> partitionColumnTypes;
try {
@@ -328,7 +349,7 @@ public class ExternalMetaCacheMgr {
public void dropPartitionsCache(long catalogId, HMSExternalTable table,
List<String> partitionNames) {
String dbName =
ClusterNamespace.getNameFromFullName(table.getDbName());
- HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
if (metaCache != null) {
metaCache.dropPartitionsCache(table, partitionNames, true);
}
@@ -338,7 +359,7 @@ public class ExternalMetaCacheMgr {
}
public void invalidatePartitionsCache(ExternalTable dorisTable,
List<String> partitionNames) {
- HiveMetaStoreCache metaCache =
cacheMap.get(dorisTable.getCatalog().getId());
+ HiveMetaStoreCache metaCache =
hiveCacheMap.get(dorisTable.getCatalog().getId());
if (metaCache != null) {
for (String partitionName : partitionNames) {
metaCache.invalidatePartitionCache(dorisTable, partitionName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 5fb1b791ab6..8e8556c8716 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -40,6 +40,7 @@ import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergMvccSnapshot;
import org.apache.doris.datasource.iceberg.IcebergSchemaCacheKey;
import org.apache.doris.datasource.iceberg.IcebergUtils;
@@ -815,7 +816,9 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
case ICEBERG:
if (GlobalVariable.enableFetchIcebergStats) {
return StatisticsUtil.getIcebergColumnStats(colName,
-
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this));
+ Env.getCurrentEnv().getExtMetaCacheMgr()
+
.getIcebergMetadataCache((IcebergExternalCatalog) this.getCatalog())
+ .getIcebergTable(this));
} else {
break;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 08186858f65..57d80c804c0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -32,9 +32,12 @@ import
org.apache.doris.nereids.trees.plans.commands.info.DropPartitionFieldOp;
import
org.apache.doris.nereids.trees.plans.commands.info.ReplacePartitionFieldOp;
import org.apache.doris.transaction.TransactionManagerFactory;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.iceberg.catalog.Catalog;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
public abstract class IcebergExternalCatalog extends ExternalCatalog {
@@ -46,6 +49,8 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String ICEBERG_DLF = "dlf";
public static final String ICEBERG_S3_TABLES = "s3tables";
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
+ public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND =
"iceberg.table.meta.cache.ttl-second";
+ public static final String ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND =
"iceberg.snapshot.meta.cache.ttl-second";
protected String icebergCatalogType;
protected Catalog catalog;
@@ -73,9 +78,36 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
@Override
public void checkProperties() throws DdlException {
super.checkProperties();
+ // check iceberg.table.meta.cache.ttl-second parameter
+ String tableMetaCacheTtlSecond =
catalogProperty.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null);
+ if (Objects.nonNull(tableMetaCacheTtlSecond) &&
NumberUtils.toInt(tableMetaCacheTtlSecond, CACHE_NO_TTL)
+ < CACHE_TTL_DISABLE_CACHE) {
+ throw new DdlException(
+ "The parameter " + ICEBERG_TABLE_META_CACHE_TTL_SECOND + "
is wrong, value is "
+ + tableMetaCacheTtlSecond);
+ }
+
+ // check iceberg.snapshot.meta.cache.ttl-second parameter
+ String partitionCacheTtlSecond =
catalogProperty.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null);
+ if (Objects.nonNull(partitionCacheTtlSecond) &&
NumberUtils.toInt(partitionCacheTtlSecond, CACHE_NO_TTL)
+ < CACHE_TTL_DISABLE_CACHE) {
+ throw new DdlException(
+ "The parameter " + ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND
+ " is wrong, value is "
+ + partitionCacheTtlSecond);
+ }
catalogProperty.checkMetaStoreAndStorageProperties(AbstractIcebergProperties.class);
}
+ @Override
+ public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
+ super.notifyPropertiesUpdated(updatedProps);
+ String tableMetaCacheTtl =
updatedProps.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null);
+ String snapshotMetaCacheTtl =
updatedProps.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null);
+ if (Objects.nonNull(tableMetaCacheTtl) ||
Objects.nonNull(snapshotMetaCacheTtl)) {
+
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
+ }
+ }
+
@Override
protected synchronized void initPreExecutionAuthenticator() {
if (executionAuthenticator == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index 8ae6ce092c0..0d49965d44a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -51,37 +52,55 @@ import java.util.concurrent.ExecutorService;
public class IcebergMetadataCache {
private static final Logger LOG =
LogManager.getLogger(IcebergMetadataCache.class);
+ private final ExecutorService executor;
+ private final IcebergExternalCatalog catalog;
+ private LoadingCache<IcebergMetadataCacheKey, List<Snapshot>>
snapshotListCache;
+ private LoadingCache<IcebergMetadataCacheKey, Table> tableCache;
+ private LoadingCache<IcebergMetadataCacheKey, IcebergSnapshotCacheValue>
snapshotCache;
+ private LoadingCache<IcebergMetadataCacheKey, View> viewCache;
+
+ public IcebergMetadataCache(IcebergExternalCatalog catalog,
ExecutorService executor) {
+ this.executor = executor;
+ this.catalog = catalog;
+ init();
+ }
+
+ public void init() {
+ long tableMetaCacheTtlSecond = NumberUtils.toLong(
+
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_TABLE_META_CACHE_TTL_SECOND),
+ ExternalCatalog.CACHE_NO_TTL);
- private final LoadingCache<IcebergMetadataCacheKey, List<Snapshot>>
snapshotListCache;
- private final LoadingCache<IcebergMetadataCacheKey, Table> tableCache;
- private final LoadingCache<IcebergMetadataCacheKey,
IcebergSnapshotCacheValue> snapshotCache;
- private final LoadingCache<IcebergMetadataCacheKey, View> viewCache;
+ long snapshotMetaCacheTtlSecond = NumberUtils.toLong(
+
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND),
+ ExternalCatalog.CACHE_NO_TTL);
- public IcebergMetadataCache(ExecutorService executor) {
CacheFactory snapshotListCacheFactory = new CacheFactory(
-
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+ OptionalLong.of(snapshotMetaCacheTtlSecond >=
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
+ ? snapshotMetaCacheTtlSecond :
Config.external_cache_expire_time_seconds_after_access),
OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
Config.max_external_table_cache_num,
true,
null);
- this.snapshotListCache = snapshotListCacheFactory.buildCache(key ->
loadSnapshots(key), executor);
+ this.snapshotListCache =
snapshotListCacheFactory.buildCache(this::loadSnapshots, executor);
CacheFactory tableCacheFactory = new CacheFactory(
-
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+ OptionalLong.of(tableMetaCacheTtlSecond >=
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
+ ? tableMetaCacheTtlSecond :
Config.external_cache_expire_time_seconds_after_access),
OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
Config.max_external_table_cache_num,
true,
null);
- this.tableCache = tableCacheFactory.buildCache(key -> loadTable(key),
executor);
+ this.tableCache = tableCacheFactory.buildCache(this::loadTable,
executor);
CacheFactory snapshotCacheFactory = new CacheFactory(
-
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+ OptionalLong.of(snapshotMetaCacheTtlSecond >=
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
+ ? snapshotMetaCacheTtlSecond :
Config.external_cache_expire_time_seconds_after_access),
OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
Config.max_external_table_cache_num,
true,
null);
- this.snapshotCache = snapshotCacheFactory.buildCache(key ->
loadSnapshot(key), executor);
- this.viewCache = tableCacheFactory.buildCache(key -> loadView(key),
executor);
+ this.snapshotCache =
snapshotCacheFactory.buildCache(this::loadSnapshot, executor);
+ this.viewCache = tableCacheFactory.buildCache(this::loadView,
executor);
}
public Table getIcebergTable(ExternalTable dorisTable) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java
deleted file mode 100644
index 6e72dcf2fac..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java
+++ /dev/null
@@ -1,51 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.iceberg;
-
-import org.apache.doris.datasource.ExternalTable;
-
-import java.util.concurrent.ExecutorService;
-
-public class IcebergMetadataCacheMgr {
-
- private IcebergMetadataCache icebergMetadataCache;
-
- public IcebergMetadataCacheMgr(ExecutorService executor) {
- this.icebergMetadataCache = new IcebergMetadataCache(executor);
- }
-
- public IcebergMetadataCache getIcebergMetadataCache() {
- return icebergMetadataCache;
- }
-
- public void removeCache(long catalogId) {
- icebergMetadataCache.invalidateCatalogCache(catalogId);
- }
-
- public void invalidateCatalogCache(long catalogId) {
- icebergMetadataCache.invalidateCatalogCache(catalogId);
- }
-
- public void invalidateTableCache(ExternalTable dorisTable) {
- icebergMetadataCache.invalidateTableCache(dorisTable);
- }
-
- public void invalidateDbCache(long catalogId, String dbName) {
- icebergMetadataCache.invalidateDbCache(catalogId, dbName);
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index da8ddc9abd9..001a9a85903 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -735,7 +735,7 @@ public class IcebergUtils {
public static Table getIcebergTable(ExternalTable dorisTable) {
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
- .getIcebergMetadataCache().getIcebergTable(dorisTable);
+ .getIcebergMetadataCache((IcebergExternalCatalog)
dorisTable.getCatalog()).getIcebergTable(dorisTable);
}
public static org.apache.iceberg.types.Type dorisTypeToIcebergType(Type
type) {
@@ -989,7 +989,7 @@ public class IcebergUtils {
// Meanwhile, it will trigger iceberg metadata cache to load the
table, so we can get it next time.
Table icebergTable = Env.getCurrentEnv()
.getExtMetaCacheMgr()
- .getIcebergMetadataCache()
+ .getIcebergMetadataCache((IcebergExternalCatalog)
tbl.getCatalog())
.getIcebergTable(tbl);
Snapshot snapshot = icebergTable.currentSnapshot();
if (snapshot == null) {
@@ -1496,7 +1496,8 @@ public class IcebergUtils {
new IcebergSnapshot(info.getSnapshotId(),
info.getSchemaId()));
} else {
// Otherwise, use the latest snapshot and the latest schema.
- return
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
+ return Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getIcebergMetadataCache((IcebergExternalCatalog)
dorisTable.getCatalog())
.getSnapshotCache(dorisTable);
}
}
@@ -1520,7 +1521,8 @@ public class IcebergUtils {
}
public static View getIcebergView(ExternalTable dorisTable) {
- IcebergMetadataCache metadataCache =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache();
+ IcebergMetadataCache metadataCache =
Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getIcebergMetadataCache((IcebergExternalCatalog)
dorisTable.getCatalog());
return metadataCache.getIcebergView(dorisTable);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
index 78f7ece1a7a..6d0999ff44c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.ColumnRange;
@@ -54,10 +55,8 @@ public class IcebergApiSource implements IcebergSource {
throw new UnsupportedOperationException("IcebergApiSource does not
support view");
}
this.icebergExtTable = table;
-
- this.originTable =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
- icebergExtTable);
-
+ this.originTable = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getIcebergMetadataCache((IcebergExternalCatalog)
table.getCatalog()).getIcebergTable(icebergExtTable);
this.desc = desc;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
index 2e68c775a3e..8d18d3e211e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergUtils;
public class IcebergHMSSource implements IcebergSource {
@@ -36,8 +37,8 @@ public class IcebergHMSSource implements IcebergSource {
this.hmsTable = hmsTable;
this.desc = desc;
this.icebergTable =
-
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
- .getIcebergTable(hmsTable);
+ Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getIcebergMetadataCache((IcebergExternalCatalog)
hmsTable.getCatalog()).getIcebergTable(hmsTable);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 06b4399d1bb..1fda1c838dc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -1586,7 +1586,7 @@ public class MetadataGenerator {
fillBatch(dataBatch,
hudiMetadataCacheMgr.getCacheStats(catalog), catalog.getName());
} else if (catalogIf instanceof IcebergExternalCatalog) {
// 3. iceberg cache
- IcebergMetadataCache icebergCache =
mgr.getIcebergMetadataCache();
+ IcebergMetadataCache icebergCache =
mgr.getIcebergMetadataCache((IcebergExternalCatalog) catalogIf);
fillBatch(dataBatch, icebergCache.getCacheStats(),
catalogIf.getName());
}
}
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
new file mode 100644
index 00000000000..fa0142089a1
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_table_meta_cache",
"p0,external,doris,external_docker,external_docker_doris") {
+ String catalog_name = "test_iceberg_meta_cache"
+ String catalog_name_no_cache = "test_iceberg_meta_no_cache"
+
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ for (String hivePrefix : ["hive2"]) {
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+ String hmsPort = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
+ String hdfs_port = context.config.otherConfigs.get(hivePrefix +
"HdfsPort")
+ String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}"
+ String warehouse = "${default_fs}/warehouse"
+
+ // 1. test default catalog
+ sql """drop catalog if exists ${catalog_name};"""
+ sql """
+ create catalog ${catalog_name} properties (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hms',
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
+ 'fs.defaultFS' = '${default_fs}',
+ 'warehouse' = '${warehouse}'
+ );
+ """
+ sql """switch ${catalog_name}"""
+ sql """drop database if exists test_iceberg_meta_cache_db"""
+ sql """create database test_iceberg_meta_cache_db"""
+ sql """
+ CREATE TABLE test_iceberg_meta_cache_db.sales (
+ id INT,
+ amount DOUBLE
+ );
+ """
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
2.0)"""
+ // select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(2,
2.0)"""
+ // still select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
3.0)"""
+ // still select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ sql """refresh table test_iceberg_meta_cache_db.sales"""
+ // select 3 rows
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ sql """drop table test_iceberg_meta_cache_db.sales"""
+
+ // 2. test catalog with iceberg.table.meta.cache.ttl-second
+ sql """drop catalog if exists ${catalog_name_no_cache};"""
+ test {
+ sql """
+ create catalog ${catalog_name_no_cache} properties (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hms',
+ 'hive.metastore.uris' =
'thrift://${externalEnvIp}:${hmsPort}',
+ 'fs.defaultFS' = '${default_fs}',
+ 'warehouse' = '${warehouse}',
+ 'iceberg.table.meta.cache.ttl-second' = '-2'
+ );
+ """
+ exception "is wrong"
+ }
+
+ // disable iceberg table meta cache
+ sql """
+ create catalog ${catalog_name_no_cache} properties (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hms',
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
+ 'fs.defaultFS' = '${default_fs}',
+ 'warehouse' = '${warehouse}',
+ 'iceberg.table.meta.cache.ttl-second' = '0'
+ );
+ """
+ sql """switch ${catalog_name_no_cache}"""
+ sql """drop database if exists test_iceberg_meta_cache_db"""
+ sql """create database test_iceberg_meta_cache_db"""
+ sql """
+ CREATE TABLE test_iceberg_meta_cache_db.sales (
+ id INT,
+ amount DOUBLE
+ );
+ """
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
2.0)"""
+ // select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(2,
2.0)"""
+ // select 2 rows
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
3.0)"""
+ // select 3 rows
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ sql """drop table test_iceberg_meta_cache_db.sales"""
+
+ // test modify ttl property
+ sql """drop catalog if exists ${catalog_name_no_cache};"""
+ // 1. create catalog with default property fisrt
+ sql """
+ create catalog ${catalog_name_no_cache} properties (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hms',
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
+ 'fs.defaultFS' = '${default_fs}',
+ 'warehouse' = '${warehouse}'
+ );
+ """
+ sql """switch ${catalog_name_no_cache}"""
+ sql """drop database if exists test_iceberg_meta_cache_db"""
+ sql """create database test_iceberg_meta_cache_db"""
+ sql """
+ CREATE TABLE test_iceberg_meta_cache_db.sales (
+ id INT,
+ amount DOUBLE
+ );
+ """
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
2.0)"""
+ // select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(2,
2.0)"""
+ // still select 1 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // alter wrong catalog property
+ test {
+ sql """alter catalog ${catalog_name_no_cache} set properties
("iceberg.table.meta.cache.ttl-second" = "-2")"""
+ exception "is wrong"
+ }
+ // alter catalog property, disable meta cache
+ sql """alter catalog ${catalog_name_no_cache} set properties
("iceberg.table.meta.cache.ttl-second" = "0")"""
+ // select 2 rows
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(3,
2.0)"""
+ // select 3 row
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+
+ // insert into new value
+ sql """insert into test_iceberg_meta_cache_db.sales values(1,
3.0)"""
+ // select 4 rows
+ sql """select * from test_iceberg_meta_cache_db.sales"""
+ sql """drop table test_iceberg_meta_cache_db.sales"""
+ sql """drop database if exists test_iceberg_meta_cache_db"""
+ sql """drop catalog if exists ${catalog_name_no_cache};"""
+ }
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]