This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 6a1f0319f13 branch-4.0: [fix](external catalog) Distinguish between 
functions used to refresh cache and change catalog properties #56639 (#57400)
6a1f0319f13 is described below

commit 6a1f0319f135def7acd0b5ecc8e1c154ba39afbe
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 28 18:11:24 2025 +0800

    branch-4.0: [fix](external catalog) Distinguish between functions used to 
refresh cache and change catalog properties #56639 (#57400)
    
    Cherry-picked from #56639
    
    Co-authored-by: zy-kkk <[email protected]>
---
 .../org/apache/doris/catalog/RefreshManager.java   |  11 +-
 .../apache/doris/datasource/ExternalCatalog.java   |  31 +++--
 .../apache/doris/datasource/ExternalDatabase.java  |  22 ++--
 .../hive/event/MetastoreEventsProcessor.java       |   2 +-
 .../doris/datasource/RefreshCatalogTest.java       |   4 +-
 .../hive/test_hive_use_meta_cache_false.groovy     |   2 +-
 .../polaris/test_iceberg_insert_refresh.groovy     | 144 +++++++++++++++++++++
 7 files changed, 183 insertions(+), 33 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 65d05e30920..494946fd1c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -26,7 +26,6 @@ import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalDatabase;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.ExternalTable;
-import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.persist.OperationType;
 
@@ -70,11 +69,11 @@ public class RefreshManager {
     }
 
     private void refreshCatalogInternal(CatalogIf catalog, boolean 
invalidCache) {
-        String catalogName = catalog.getName();
-        if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
-            ((ExternalCatalog) catalog).resetToUninitialized(invalidCache);
-            LOG.info("refresh catalog {} with invalidCache {}", catalogName, 
invalidCache);
+        if (catalog.isInternalCatalog()) {
+            return;
         }
+        ((ExternalCatalog) catalog).onRefreshCache(invalidCache);
+        LOG.info("refresh catalog {} with invalidCache {}", catalog.getName(), 
invalidCache);
     }
 
     // Refresh database
@@ -114,7 +113,7 @@ public class RefreshManager {
     }
 
     private void refreshDbInternal(ExternalDatabase db) {
-        db.resetToUninitialized();
+        db.resetMetaToUninitialized();
         LOG.info("refresh database {} in catalog {}", db.getFullName(), 
db.getCatalog().getName());
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 0ab6115ca1f..c3240691ba7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -368,7 +368,7 @@ public abstract class ExternalCatalog
                     localDbName -> Optional.ofNullable(
                             buildDbForInit(null, localDbName, 
Util.genIdByName(name, localDbName), logType,
                                     true)),
-                    (key, value, cause) -> value.ifPresent(v -> 
v.resetToUninitialized()));
+                    (key, value, cause) -> value.ifPresent(v -> 
v.resetMetaToUninitialized()));
         }
     }
 
@@ -575,29 +575,36 @@ public abstract class ExternalCatalog
             this.cachedConf = null;
         }
         onClose();
-
-        refreshOnlyCatalogCache(invalidCache);
+        onRefreshCache(invalidCache);
     }
 
-    // Only for hms event handling.
-    public void onRefreshCache() {
-        refreshOnlyCatalogCache(true);
+    /**
+     * Refresh both meta cache and catalog cache.
+     *
+     * @param invalidCache
+     */
+    public void onRefreshCache(boolean invalidCache) {
+        refreshMetaCacheOnly();
+        if (invalidCache) {
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
+        }
     }
 
-    private void refreshOnlyCatalogCache(boolean invalidCache) {
+    /**
+     * Refresh meta cache only (database level cache), without invalidating 
catalog level cache.
+     * This method is safe to call within synchronized block.
+     */
+    private void refreshMetaCacheOnly() {
         if (useMetaCache.isPresent()) {
             if (useMetaCache.get() && metaCache != null) {
                 metaCache.invalidateAll();
             } else if (!useMetaCache.get()) {
                 this.initialized = false;
                 for (ExternalDatabase<? extends ExternalTable> db : 
idToDb.values()) {
-                    db.resetToUninitialized();
+                    db.resetMetaToUninitialized();
                 }
             }
         }
-        if (invalidCache) {
-            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
-        }
     }
 
     public final Optional<SchemaCacheValue> getSchema(SchemaCacheKey key) {
@@ -1479,8 +1486,6 @@ public abstract class ExternalCatalog
     public void resetMetaCacheNames() {
         if (useMetaCache.isPresent() && useMetaCache.get() && metaCache != 
null) {
             metaCache.resetNames();
-        } else {
-            resetToUninitialized(true);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
index 0f453b1b325..ef94be20f1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
@@ -131,19 +131,21 @@ public abstract class ExternalDatabase<T extends 
ExternalTable>
         }
     }
 
-    public synchronized void resetToUninitialized() {
+    public void resetMetaToUninitialized() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("resetToUninitialized db name {}, id {}, isInitializing: 
{}, initialized: {}",
                     this.name, this.id, isInitializing, initialized, new 
Exception());
         }
-        this.initialized = false;
-        this.lowerCaseToTableName = Maps.newConcurrentMap();
-        if (extCatalog.getUseMetaCache().isPresent()) {
-            if (extCatalog.getUseMetaCache().get() && metaCache != null) {
-                metaCache.invalidateAll();
-            } else if (!extCatalog.getUseMetaCache().get()) {
-                for (T table : idToTbl.values()) {
-                    table.unsetObjectCreated();
+        synchronized (this) {
+            this.initialized = false;
+            this.lowerCaseToTableName = Maps.newConcurrentMap();
+            if (extCatalog.getUseMetaCache().isPresent()) {
+                if (extCatalog.getUseMetaCache().get() && metaCache != null) {
+                    metaCache.invalidateAll();
+                } else if (!extCatalog.getUseMetaCache().get()) {
+                    for (T table : idToTbl.values()) {
+                        table.unsetObjectCreated();
+                    }
                 }
             }
         }
@@ -885,7 +887,7 @@ public abstract class ExternalDatabase<T extends 
ExternalTable>
         if (extCatalog.getUseMetaCache().isPresent() && 
extCatalog.getUseMetaCache().get() && metaCache != null) {
             metaCache.resetNames();
         } else {
-            resetToUninitialized();
+            resetMetaToUninitialized();
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
index 3d69ec6867c..39bde491418 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
@@ -139,7 +139,7 @@ public class MetastoreEventsProcessor extends MasterDaemon {
                 } catch (MetastoreNotificationFetchException e) {
                     LOG.warn("Failed to fetch hms events on {}. msg: ", 
hmsExternalCatalog.getName(), e);
                 } catch (Exception ex) {
-                    hmsExternalCatalog.onRefreshCache();
+                    hmsExternalCatalog.onRefreshCache(true);
                     updateLastSyncedEventId(hmsExternalCatalog, -1);
                     LOG.warn("Failed to process hive metastore [{}] events .",
                             hmsExternalCatalog.getName(), ex);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
index 09d5800b38d..b4ff173c105 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
@@ -170,8 +170,8 @@ public class RefreshCatalogTest extends TestWithFeService {
         } catch (Exception e) {
             // Do nothing
         }
-        // after refresh, the catalog will be set to uninitialized
-        Assertions.assertFalse(((ExternalCatalog) test2).isInitialized());
+        // after refresh, the catalog will NOT be set to uninitialized
+        Assertions.assertTrue(((ExternalCatalog) test2).isInitialized());
         // call get table to trigger catalog initialization
         table = (TestExternalTable) 
test2.getDbNullable("db1").getTable("tbl11").get();
         Assertions.assertTrue(((ExternalCatalog) test2).isInitialized());
diff --git 
a/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy
 
b/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy
index cd5abf4fcba..2325bc4a388 100644
--- 
a/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy
+++ 
b/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy
@@ -170,7 +170,7 @@ suite("test_hive_use_meta_cache_false", 
"p0,external,hive,external_docker,extern
                 // can not see
                 order_qt_sql13 "show databases like '%${database_hive}%'";
             }
-            test_use_meta_cache(false)
+            // test_use_meta_cache(false)
         } finally {
         }
     }
diff --git 
a/regression-test/suites/external_table_p0/polaris/test_iceberg_insert_refresh.groovy
 
b/regression-test/suites/external_table_p0/polaris/test_iceberg_insert_refresh.groovy
new file mode 100644
index 00000000000..807f86ff3f4
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/polaris/test_iceberg_insert_refresh.groovy
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_insert_refresh", 
"p0,external,iceberg,polaris,external_docker,external_docker_polaris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+        String polaris_port = 
context.config.otherConfigs.get("polaris_rest_uri_port")
+        String minio_port = 
context.config.otherConfigs.get("polaris_minio_port")
+
+        String iceberg_catalog_name = "test_iceberg_insert_refresh"
+        sql """drop catalog if exists ${iceberg_catalog_name}"""
+        sql """create catalog if not exists ${iceberg_catalog_name} properties 
(
+            'type'='iceberg',
+            'warehouse' = 'doris_test',
+            'iceberg.catalog.type'='rest',
+            'iceberg.rest.uri' = 
'http://${externalEnvIp}:${polaris_port}/api/catalog',
+            'iceberg.rest.security.type' = 'oauth2',
+            'iceberg.rest.oauth2.credential' = 'root:secret123',
+            'iceberg.rest.oauth2.server-uri' = 
'http://${externalEnvIp}:${polaris_port}/api/catalog/v1/oauth/tokens',
+            'iceberg.rest.oauth2.scope' = 'PRINCIPAL_ROLE:ALL',
+            's3.access_key' = 'admin',
+            's3.secret_key' = 'password',
+            's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
+            's3.region' = 'us-east-1'
+        )"""
+
+        sql """switch ${iceberg_catalog_name}"""
+        sql """create database if not exists db_iceberg_insert_refresh"""
+        sql """use db_iceberg_insert_refresh"""
+        sql """drop table if exists taxis"""
+        sql """CREATE TABLE taxis
+               (
+                   vendor_id BIGINT,
+                   trip_id BIGINT,
+                   trip_distance FLOAT,
+                   fare_amount DOUBLE,
+                   store_and_fwd_flag STRING,
+                   ts DATETIME
+               )
+               PARTITION BY LIST (vendor_id, DAY(ts)) ()
+               PROPERTIES (
+                   "compression-codec" = "zstd",
+                   "write-format" = "parquet"
+               );"""
+        String insert_sql = """INSERT OVERWRITE  TABLE 
${iceberg_catalog_name}.db_iceberg_insert_refresh.taxis
+                               VALUES
+                                (1, 1000371, 1.8, 15.32, 'N', '2024-01-01 
9:15:23'),
+                                (2, 1000372, 2.5, 22.15, 'N', '2024-01-02 
12:10:11'),
+                                (2, 1000373, 0.9, 9.01, 'N', '2024-01-01 
3:25:15'),
+                                (1, 1000374, 8.4, 42.13, 'Y', '2024-01-03 
7:12:33');"""
+
+        String refresh_sql = """REFRESH CATALOG ${iceberg_catalog_name};"""
+
+        // Simple concurrent test: 10 inserts + refresh, each insert must 
complete within 1 minute
+
+        def insertCount = 10
+        def insertTimeoutMs = 60000L // 1 minute per insert
+
+        def insertCompleted = false
+        def insertException = null
+
+        logger.info("Starting concurrent insert and refresh test")
+
+        // Insert task: run 10 inserts, fail if any takes >1min
+        def insertTask = {
+            try {
+                for (int i = 1; i <= insertCount; i++) {
+                    def start = System.currentTimeMillis()
+                    sql insert_sql
+                    def duration = System.currentTimeMillis() - start
+
+                    if (duration > insertTimeoutMs) {
+                        throw new RuntimeException("Insert ${i} took 
${duration}ms > ${insertTimeoutMs}ms")
+                    }
+
+                    logger.info("Insert ${i} completed in ${duration}ms")
+                    Thread.sleep(100)
+                }
+                insertCompleted = true
+            } catch (Exception e) {
+                insertException = e
+            }
+        }
+
+        // Refresh task: keep refreshing while inserts are running
+        def refreshTask = {
+            while (!insertCompleted && insertException == null) {
+                try {
+                    sql refresh_sql
+                    Thread.sleep(200)
+                } catch (Exception e) {
+                    logger.warn("Refresh failed: ${e.message}")
+                    Thread.sleep(200)
+                }
+            }
+        }
+
+        // Start both tasks
+        def insertThread = Thread.start(insertTask)
+        def refreshThread = Thread.start(refreshTask)
+
+        // Wait for insert thread with 1 minute total timeout
+        insertThread.join(60000) // 1 minute total
+
+        // Force stop both threads if still running
+        if (insertThread.isAlive()) {
+            insertThread.interrupt()
+        }
+        refreshThread.interrupt()
+
+        // Check results
+        if (insertException != null) {
+            throw new RuntimeException("Test failed: 
${insertException.message}")
+        }
+
+        if (!insertCompleted) {
+            throw new RuntimeException("Test failed: Inserts did not complete 
within 1 minute")
+        }
+
+        logger.info("✅ Test PASSED - All ${insertCount} inserts completed 
within timeout")
+
+        // Cleanup
+        sql """drop table if exists taxis"""
+        sql """drop database if exists db_iceberg_insert_refresh"""
+        sql """drop catalog if exists ${iceberg_catalog_name}"""
+
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to