This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 6a1f0319f13 branch-4.0: [fix](external catalog) Distinguish between
functions used to refresh cache and change catalog properties #56639 (#57400)
6a1f0319f13 is described below
commit 6a1f0319f135def7acd0b5ecc8e1c154ba39afbe
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 28 18:11:24 2025 +0800
branch-4.0: [fix](external catalog) Distinguish between functions used to
refresh cache and change catalog properties #56639 (#57400)
Cherry-picked from #56639
Co-authored-by: zy-kkk <[email protected]>
---
.../org/apache/doris/catalog/RefreshManager.java | 11 +-
.../apache/doris/datasource/ExternalCatalog.java | 31 +++--
.../apache/doris/datasource/ExternalDatabase.java | 22 ++--
.../hive/event/MetastoreEventsProcessor.java | 2 +-
.../doris/datasource/RefreshCatalogTest.java | 4 +-
.../hive/test_hive_use_meta_cache_false.groovy | 2 +-
.../polaris/test_iceberg_insert_refresh.groovy | 144 +++++++++++++++++++++
7 files changed, 183 insertions(+), 33 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 65d05e30920..494946fd1c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -26,7 +26,6 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.ExternalTable;
-import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.persist.OperationType;
@@ -70,11 +69,11 @@ public class RefreshManager {
}
private void refreshCatalogInternal(CatalogIf catalog, boolean
invalidCache) {
- String catalogName = catalog.getName();
- if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
- ((ExternalCatalog) catalog).resetToUninitialized(invalidCache);
- LOG.info("refresh catalog {} with invalidCache {}", catalogName,
invalidCache);
+ if (catalog.isInternalCatalog()) {
+ return;
}
+ ((ExternalCatalog) catalog).onRefreshCache(invalidCache);
+ LOG.info("refresh catalog {} with invalidCache {}", catalog.getName(),
invalidCache);
}
// Refresh database
@@ -114,7 +113,7 @@ public class RefreshManager {
}
private void refreshDbInternal(ExternalDatabase db) {
- db.resetToUninitialized();
+ db.resetMetaToUninitialized();
LOG.info("refresh database {} in catalog {}", db.getFullName(),
db.getCatalog().getName());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 0ab6115ca1f..c3240691ba7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -368,7 +368,7 @@ public abstract class ExternalCatalog
localDbName -> Optional.ofNullable(
buildDbForInit(null, localDbName,
Util.genIdByName(name, localDbName), logType,
true)),
- (key, value, cause) -> value.ifPresent(v ->
v.resetToUninitialized()));
+ (key, value, cause) -> value.ifPresent(v ->
v.resetMetaToUninitialized()));
}
}
@@ -575,29 +575,36 @@ public abstract class ExternalCatalog
this.cachedConf = null;
}
onClose();
-
- refreshOnlyCatalogCache(invalidCache);
+ onRefreshCache(invalidCache);
}
- // Only for hms event handling.
- public void onRefreshCache() {
- refreshOnlyCatalogCache(true);
+ /**
+ * Refresh both meta cache and catalog cache.
+ *
+ * @param invalidCache
+ */
+ public void onRefreshCache(boolean invalidCache) {
+ refreshMetaCacheOnly();
+ if (invalidCache) {
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
+ }
}
- private void refreshOnlyCatalogCache(boolean invalidCache) {
+ /**
+ * Refresh meta cache only (database level cache), without invalidating
catalog level cache.
+ * This method is safe to call within synchronized block.
+ */
+ private void refreshMetaCacheOnly() {
if (useMetaCache.isPresent()) {
if (useMetaCache.get() && metaCache != null) {
metaCache.invalidateAll();
} else if (!useMetaCache.get()) {
this.initialized = false;
for (ExternalDatabase<? extends ExternalTable> db :
idToDb.values()) {
- db.resetToUninitialized();
+ db.resetMetaToUninitialized();
}
}
}
- if (invalidCache) {
-
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
- }
}
public final Optional<SchemaCacheValue> getSchema(SchemaCacheKey key) {
@@ -1479,8 +1486,6 @@ public abstract class ExternalCatalog
public void resetMetaCacheNames() {
if (useMetaCache.isPresent() && useMetaCache.get() && metaCache !=
null) {
metaCache.resetNames();
- } else {
- resetToUninitialized(true);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
index 0f453b1b325..ef94be20f1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
@@ -131,19 +131,21 @@ public abstract class ExternalDatabase<T extends
ExternalTable>
}
}
- public synchronized void resetToUninitialized() {
+ public void resetMetaToUninitialized() {
if (LOG.isDebugEnabled()) {
LOG.debug("resetToUninitialized db name {}, id {}, isInitializing:
{}, initialized: {}",
this.name, this.id, isInitializing, initialized, new
Exception());
}
- this.initialized = false;
- this.lowerCaseToTableName = Maps.newConcurrentMap();
- if (extCatalog.getUseMetaCache().isPresent()) {
- if (extCatalog.getUseMetaCache().get() && metaCache != null) {
- metaCache.invalidateAll();
- } else if (!extCatalog.getUseMetaCache().get()) {
- for (T table : idToTbl.values()) {
- table.unsetObjectCreated();
+ synchronized (this) {
+ this.initialized = false;
+ this.lowerCaseToTableName = Maps.newConcurrentMap();
+ if (extCatalog.getUseMetaCache().isPresent()) {
+ if (extCatalog.getUseMetaCache().get() && metaCache != null) {
+ metaCache.invalidateAll();
+ } else if (!extCatalog.getUseMetaCache().get()) {
+ for (T table : idToTbl.values()) {
+ table.unsetObjectCreated();
+ }
}
}
}
@@ -885,7 +887,7 @@ public abstract class ExternalDatabase<T extends
ExternalTable>
if (extCatalog.getUseMetaCache().isPresent() &&
extCatalog.getUseMetaCache().get() && metaCache != null) {
metaCache.resetNames();
} else {
- resetToUninitialized();
+ resetMetaToUninitialized();
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
index 3d69ec6867c..39bde491418 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
@@ -139,7 +139,7 @@ public class MetastoreEventsProcessor extends MasterDaemon {
} catch (MetastoreNotificationFetchException e) {
LOG.warn("Failed to fetch hms events on {}. msg: ",
hmsExternalCatalog.getName(), e);
} catch (Exception ex) {
- hmsExternalCatalog.onRefreshCache();
+ hmsExternalCatalog.onRefreshCache(true);
updateLastSyncedEventId(hmsExternalCatalog, -1);
LOG.warn("Failed to process hive metastore [{}] events .",
hmsExternalCatalog.getName(), ex);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
index 09d5800b38d..b4ff173c105 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
@@ -170,8 +170,8 @@ public class RefreshCatalogTest extends TestWithFeService {
} catch (Exception e) {
// Do nothing
}
- // after refresh, the catalog will be set to uninitialized
- Assertions.assertFalse(((ExternalCatalog) test2).isInitialized());
+ // after refresh, the catalog will NOT be set to uninitialized
+ Assertions.assertTrue(((ExternalCatalog) test2).isInitialized());
// call get table to trigger catalog initialization
table = (TestExternalTable)
test2.getDbNullable("db1").getTable("tbl11").get();
Assertions.assertTrue(((ExternalCatalog) test2).isInitialized());
diff --git
a/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy
b/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy
index cd5abf4fcba..2325bc4a388 100644
---
a/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy
+++
b/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy
@@ -170,7 +170,7 @@ suite("test_hive_use_meta_cache_false",
"p0,external,hive,external_docker,extern
// can not see
order_qt_sql13 "show databases like '%${database_hive}%'";
}
- test_use_meta_cache(false)
+ // test_use_meta_cache(false)
} finally {
}
}
diff --git
a/regression-test/suites/external_table_p0/polaris/test_iceberg_insert_refresh.groovy
b/regression-test/suites/external_table_p0/polaris/test_iceberg_insert_refresh.groovy
new file mode 100644
index 00000000000..807f86ff3f4
--- /dev/null
+++
b/regression-test/suites/external_table_p0/polaris/test_iceberg_insert_refresh.groovy
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_insert_refresh",
"p0,external,iceberg,polaris,external_docker,external_docker_polaris") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ String polaris_port =
context.config.otherConfigs.get("polaris_rest_uri_port")
+ String minio_port =
context.config.otherConfigs.get("polaris_minio_port")
+
+ String iceberg_catalog_name = "test_iceberg_insert_refresh"
+ sql """drop catalog if exists ${iceberg_catalog_name}"""
+ sql """create catalog if not exists ${iceberg_catalog_name} properties
(
+ 'type'='iceberg',
+ 'warehouse' = 'doris_test',
+ 'iceberg.catalog.type'='rest',
+ 'iceberg.rest.uri' =
'http://${externalEnvIp}:${polaris_port}/api/catalog',
+ 'iceberg.rest.security.type' = 'oauth2',
+ 'iceberg.rest.oauth2.credential' = 'root:secret123',
+ 'iceberg.rest.oauth2.server-uri' =
'http://${externalEnvIp}:${polaris_port}/api/catalog/v1/oauth/tokens',
+ 'iceberg.rest.oauth2.scope' = 'PRINCIPAL_ROLE:ALL',
+ 's3.access_key' = 'admin',
+ 's3.secret_key' = 'password',
+ 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
+ 's3.region' = 'us-east-1'
+ )"""
+
+ sql """switch ${iceberg_catalog_name}"""
+ sql """create database if not exists db_iceberg_insert_refresh"""
+ sql """use db_iceberg_insert_refresh"""
+ sql """drop table if exists taxis"""
+ sql """CREATE TABLE taxis
+ (
+ vendor_id BIGINT,
+ trip_id BIGINT,
+ trip_distance FLOAT,
+ fare_amount DOUBLE,
+ store_and_fwd_flag STRING,
+ ts DATETIME
+ )
+ PARTITION BY LIST (vendor_id, DAY(ts)) ()
+ PROPERTIES (
+ "compression-codec" = "zstd",
+ "write-format" = "parquet"
+ );"""
+ String insert_sql = """INSERT OVERWRITE TABLE
${iceberg_catalog_name}.db_iceberg_insert_refresh.taxis
+ VALUES
+ (1, 1000371, 1.8, 15.32, 'N', '2024-01-01
9:15:23'),
+ (2, 1000372, 2.5, 22.15, 'N', '2024-01-02
12:10:11'),
+ (2, 1000373, 0.9, 9.01, 'N', '2024-01-01
3:25:15'),
+ (1, 1000374, 8.4, 42.13, 'Y', '2024-01-03
7:12:33');"""
+
+ String refresh_sql = """REFRESH CATALOG ${iceberg_catalog_name};"""
+
+ // Simple concurrent test: 10 inserts + refresh, each insert must
complete within 1 minute
+
+ def insertCount = 10
+ def insertTimeoutMs = 60000L // 1 minute per insert
+
+ def insertCompleted = false
+ def insertException = null
+
+ logger.info("Starting concurrent insert and refresh test")
+
+ // Insert task: run 10 inserts, fail if any takes >1min
+ def insertTask = {
+ try {
+ for (int i = 1; i <= insertCount; i++) {
+ def start = System.currentTimeMillis()
+ sql insert_sql
+ def duration = System.currentTimeMillis() - start
+
+ if (duration > insertTimeoutMs) {
+ throw new RuntimeException("Insert ${i} took
${duration}ms > ${insertTimeoutMs}ms")
+ }
+
+ logger.info("Insert ${i} completed in ${duration}ms")
+ Thread.sleep(100)
+ }
+ insertCompleted = true
+ } catch (Exception e) {
+ insertException = e
+ }
+ }
+
+ // Refresh task: keep refreshing while inserts are running
+ def refreshTask = {
+ while (!insertCompleted && insertException == null) {
+ try {
+ sql refresh_sql
+ Thread.sleep(200)
+ } catch (Exception e) {
+ logger.warn("Refresh failed: ${e.message}")
+ Thread.sleep(200)
+ }
+ }
+ }
+
+ // Start both tasks
+ def insertThread = Thread.start(insertTask)
+ def refreshThread = Thread.start(refreshTask)
+
+ // Wait for insert thread with 1 minute total timeout
+ insertThread.join(60000) // 1 minute total
+
+ // Force stop both threads if still running
+ if (insertThread.isAlive()) {
+ insertThread.interrupt()
+ }
+ refreshThread.interrupt()
+
+ // Check results
+ if (insertException != null) {
+ throw new RuntimeException("Test failed:
${insertException.message}")
+ }
+
+ if (!insertCompleted) {
+ throw new RuntimeException("Test failed: Inserts did not complete
within 1 minute")
+ }
+
+ logger.info("✅ Test PASSED - All ${insertCount} inserts completed
within timeout")
+
+ // Cleanup
+ sql """drop table if exists taxis"""
+ sql """drop database if exists db_iceberg_insert_refresh"""
+ sql """drop catalog if exists ${iceberg_catalog_name}"""
+
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]