This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 5b22a1f1c66 branch-3.1: [fix](catalog) Fix lastUpdateTime not updated
on refresh and add scheduled refresh logs #58997 (#59143)
5b22a1f1c66 is described below
commit 5b22a1f1c660cf695e5154a7d32e441966b49d3a
Author: zy-kkk <[email protected]>
AuthorDate: Fri Dec 19 10:44:57 2025 +0800
branch-3.1: [fix](catalog) Fix lastUpdateTime not updated on refresh and
add scheduled refresh logs #58997 (#59143)
picked from #58997
---
.../org/apache/doris/catalog/RefreshManager.java | 13 ++-
.../org/apache/doris/datasource/CatalogMgr.java | 19 +++-
.../apache/doris/datasource/ExternalCatalog.java | 1 +
.../doris/datasource/RefreshCatalogTest.java | 16 ++--
.../test_jdbc_catalog_refresh_update_time.groovy | 104 +++++++++++++++++++++
5 files changed, 142 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 9738ee39355..2802055a887 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -272,10 +272,12 @@ public class RefreshManager {
}
public void addToRefreshMap(long catalogId, Integer[] sec) {
+ LOG.info("Add catalog id={} to scheduled refresh map, interval={}s",
catalogId, sec[0]);
refreshMap.put(catalogId, sec);
}
public void removeFromRefreshMap(long catalogId) {
+ LOG.info("Remove catalog (id={}) from scheduled refresh map",
catalogId);
refreshMap.remove(catalogId);
}
@@ -300,6 +302,8 @@ public class RefreshManager {
CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (catalog != null) {
String catalogName = catalog.getName();
+ LOG.info("Scheduled refresh triggered for catalog {}
(id={}), interval={}s, invalidCache=true",
+ catalogName, catalogId, original);
/**
* Now do not invoke
* {@link
org.apache.doris.analysis.RefreshCatalogStmt#analyze(Analyzer)} is ok,
@@ -307,13 +311,20 @@ public class RefreshManager {
* */
try {
Env.getCurrentEnv().getRefreshManager().handleRefreshCatalog(catalogName, true);
+ LOG.info("Scheduled refresh completed for catalog
{} (id={}), next refresh in {}s",
+ catalogName, catalogId, original);
} catch (Exception e) {
- LOG.warn("failed to refresh catalog {}",
catalogName, e);
+ LOG.warn("Failed to execute scheduled refresh for
catalog {} (id={})",
+ catalogName, catalogId, e);
}
// reset
timeGroup[1] = original;
refreshMap.put(catalogId, timeGroup);
+ } else {
+ LOG.warn("Scheduled refresh skipped: catalog id={} not
found, removing from refresh map",
+ catalogId);
+ refreshMap.remove(catalogId);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index 57029ec5cc1..86cc33672b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -800,14 +800,27 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
}
public void registerCatalogRefreshListener(Env env) {
+ int registeredCount = 0;
for (CatalogIf catalog : idToCatalog.values()) {
Map<String, String> properties = catalog.getProperties();
if (properties.containsKey(METADATA_REFRESH_INTERVAL_SEC)) {
- Integer metadataRefreshIntervalSec =
Integer.valueOf(properties.get(METADATA_REFRESH_INTERVAL_SEC));
- Integer[] sec = {metadataRefreshIntervalSec,
metadataRefreshIntervalSec};
- env.getRefreshManager().addToRefreshMap(catalog.getId(), sec);
+ try {
+ Integer metadataRefreshIntervalSec =
Integer.valueOf(properties.get(METADATA_REFRESH_INTERVAL_SEC));
+ LOG.info("Registering scheduled refresh for catalog {}
(id={}), type={}, interval={}s",
+ catalog.getName(), catalog.getId(),
catalog.getType(), metadataRefreshIntervalSec);
+ Integer[] sec = {metadataRefreshIntervalSec,
metadataRefreshIntervalSec};
+ env.getRefreshManager().addToRefreshMap(catalog.getId(),
sec);
+ registeredCount++;
+ } catch (Exception e) {
+ LOG.warn("Failed to register scheduled refresh for catalog
{} (id={}), "
+ + "invalid {} value: {}",
+ catalog.getName(), catalog.getId(),
METADATA_REFRESH_INTERVAL_SEC,
+ properties.get(METADATA_REFRESH_INTERVAL_SEC), e);
+ }
}
}
+ LOG.info("Finished registering catalog refresh listeners, {} catalogs
with scheduled refresh enabled",
+ registeredCount);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 12cc3721c99..9e95d58db00 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -605,6 +605,7 @@ public abstract class ExternalCatalog
* @param invalidCache
*/
public void onRefreshCache(boolean invalidCache) {
+ setLastUpdateTime(System.currentTimeMillis());
refreshMetaCacheOnly();
if (invalidCache) {
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
index 56068c210fe..349b18cc509 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
@@ -125,16 +125,18 @@ public class RefreshCatalogTest extends TestWithFeService
{
@Test
public void testRefreshCatalogLastUpdateTime() throws Exception {
CatalogIf test2 = env.getCatalogMgr().getCatalog("test2");
- // init is 0
+ // lastUpdateTime is set when catalog is created (via
resetToUninitialized -> onRefreshCache)
long l1 = test2.getLastUpdateTime();
- Assertions.assertTrue(l1 == 0);
+ Assertions.assertTrue(l1 > 0);
TestExternalTable table = (TestExternalTable)
test2.getDbNullable("db1").getTable("tbl11").get();
- // getDb() triggered init method
+ // getDb() triggered init method, but lastUpdateTime was already set
long l2 = test2.getLastUpdateTime();
- Assertions.assertTrue(l2 > l1);
+ Assertions.assertTrue(l2 >= l1);
Assertions.assertFalse(table.isObjectCreated());
table.makeSureInitialized();
Assertions.assertTrue(table.isObjectCreated());
+
+ Thread.sleep(100); // wait a bit to ensure time difference
RefreshCatalogStmt refreshCatalogStmt = new
RefreshCatalogStmt("test2", null);
Assertions.assertTrue(refreshCatalogStmt.isInvalidCache());
try {
@@ -142,10 +144,10 @@ public class RefreshCatalogTest extends TestWithFeService
{
} catch (Exception e) {
// Do nothing
}
- // not triggered init method
+ // refresh should update lastUpdateTime
long l3 = test2.getLastUpdateTime();
- Assertions.assertTrue(l3 == l2);
- // when use_meta_cache is true, the table will be recreated after
refresh.
+ Assertions.assertTrue(l3 > l2);
+ // the table will be recreated after refresh.
// so we need to get table again
table = (TestExternalTable)
test2.getDbNullable("db1").getTable("tbl11").get();
Assertions.assertTrue(((ExternalCatalog) test2).isInitialized());
diff --git
a/regression-test/suites/external_table_p0/jdbc/test_jdbc_catalog_refresh_update_time.groovy
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_catalog_refresh_update_time.groovy
new file mode 100644
index 00000000000..ff2d6cda08e
--- /dev/null
+++
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_catalog_refresh_update_time.groovy
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_jdbc_catalog_refresh_update_time",
"p0,external,mysql,external_docker,external_docker_mysql") {
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ return
+ }
+
+ // Helper function to get LastUpdateTime for a catalog
+ // Column index: 0=CatalogId, 1=Name, 2=Type, 3=IsCurrent, 4=CreateTime,
5=LastUpdateTime
+ def getLastUpdateTime = { catalogName ->
+ def catalogs = sql """show catalogs"""
+ for (row in catalogs) {
+ if (row[1] == catalogName) {
+ return row[5]
+ }
+ }
+ return null
+ }
+
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+ String catalog_name = "test_refresh_update_time_catalog"
+
+ sql """drop catalog if exists ${catalog_name}"""
+
+ // Test 1: Manual refresh should update LastUpdateTime
+ sql """create catalog if not exists ${catalog_name} properties(
+ "type"="jdbc",
+ "user"="root",
+ "password"="123456",
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver"
+ );"""
+
+ // Get initial LastUpdateTime
+ String initialUpdateTime = getLastUpdateTime(catalog_name)
+ assertNotNull(initialUpdateTime, "Catalog ${catalog_name} should exist")
+
+ // Wait a bit to ensure time difference
+ Thread.sleep(2000)
+
+ // Manual refresh
+ sql """refresh catalog ${catalog_name}"""
+
+ // Get LastUpdateTime after refresh
+ String afterRefreshUpdateTime = getLastUpdateTime(catalog_name)
+
+ // Verify LastUpdateTime changed after manual refresh
+ assertTrue(afterRefreshUpdateTime != initialUpdateTime,
+ "LastUpdateTime should change after manual refresh. Initial:
${initialUpdateTime}, After: ${afterRefreshUpdateTime}")
+
+ sql """drop catalog if exists ${catalog_name}"""
+
+ // Test 2: Scheduled refresh should update LastUpdateTime
+ String scheduled_catalog_name = "test_scheduled_refresh_catalog"
+ sql """drop catalog if exists ${scheduled_catalog_name}"""
+
+ sql """create catalog if not exists ${scheduled_catalog_name} properties(
+ "type"="jdbc",
+ "user"="root",
+ "password"="123456",
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "metadata_refresh_interval_sec" = "5"
+ );"""
+
+ // Get initial LastUpdateTime
+ String scheduledInitialTime = getLastUpdateTime(scheduled_catalog_name)
+ assertNotNull(scheduledInitialTime, "Catalog ${scheduled_catalog_name}
should exist")
+
+ // Wait for scheduled refresh (interval is 5 seconds, wait 8 seconds to be
safe)
+ Thread.sleep(8000)
+
+ // Get LastUpdateTime after scheduled refresh
+ String scheduledAfterRefreshTime =
getLastUpdateTime(scheduled_catalog_name)
+
+ // Verify LastUpdateTime changed after scheduled refresh
+ assertTrue(scheduledAfterRefreshTime != scheduledInitialTime,
+ "LastUpdateTime should change after scheduled refresh. Initial:
${scheduledInitialTime}, After: ${scheduledAfterRefreshTime}")
+
+ sql """drop catalog if exists ${scheduled_catalog_name}"""
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]