morningman commented on code in PR #17884:
URL: https://github.com/apache/doris/pull/17884#discussion_r1157399902


##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -146,4 +164,36 @@ private void 
refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env env) thro
                 stmt.getTableName(), "ICEBERG", icebergProperties, "");
         env.createTable(createTableStmt);
     }
+
+    public void registerIntoRefreshMap(String catalogName, Integer[] sec) {

Review Comment:
   ```suggestion
       public void addToRefreshMap(String catalogName, Integer[] sec) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java:
##########
@@ -462,6 +464,20 @@ public CatalogIf replayCreateCatalog(CatalogLog log, 
boolean isReplay) throws Dd
         writeLock();
         try {
             CatalogIf catalog = CatalogFactory.constructorFromLog(log);
+            Map<String, String> props = log.getProps();
+            if (props.containsKey(METADATA_REFRESH_INTERVAL_SEC)) {

Review Comment:
   This should be done after `((ExternalCatalog) catalog).checkProperties()`.
   And all properties, including `METADATA_REFRESH_INTERVAL_SEC`, should be 
checked in `checkProperties()`.
   After `checkProperties()`, all properties should be valid.



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -146,4 +164,36 @@ private void 
refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env env) thro
                 stmt.getTableName(), "ICEBERG", icebergProperties, "");
         env.createTable(createTableStmt);
     }
+
+    public void registerIntoRefreshMap(String catalogName, Integer[] sec) {
+        refreshMap.put(catalogName, sec);
+    }
+
+    public void logOutOfRefreshMap(String catalogName) {
+        refreshMap.remove(catalogName);
+    }
+
+    @Override
+    public void run() {
+        for (Map.Entry<String, Integer[]> entry : refreshMap.entrySet()) {
+            String catalogName = entry.getKey();
+            Integer[] timeGroup = entry.getValue();
+            Integer original = timeGroup[0];
+            Integer current = timeGroup[1];
+            if (current - REFRESH_TIME > 0) {
+                timeGroup[1] = current - REFRESH_TIME;
+                refreshMap.put(catalogName, timeGroup);
+            } else {
+                RefreshCatalogStmt refreshCatalogStmt = new 
RefreshCatalogStmt(catalogName, null);
+                try {
+                    DdlExecutor.execute(Env.getCurrentEnv(), 
refreshCatalogStmt);
+                } catch (Exception e) {
+                    e.printStackTrace();

Review Comment:
   ```suggestion
                       LOG.warn("failed to refresh catalog {}", catalogName, e);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java:
##########
@@ -462,6 +464,20 @@ public CatalogIf replayCreateCatalog(CatalogLog log, 
boolean isReplay) throws Dd
         writeLock();
         try {
             CatalogIf catalog = CatalogFactory.constructorFromLog(log);
+            Map<String, String> props = log.getProps();
+            if (props.containsKey(METADATA_REFRESH_INTERVAL_SEC)) {
+                // need refresh
+                String catalogName = log.getCatalogName();
+                Integer metadataRefreshIntervalSec = null;
+                try {
+                    metadataRefreshIntervalSec = 
Integer.valueOf(props.get(METADATA_REFRESH_INTERVAL_SEC));
+                } catch (NumberFormatException e) {
+                    throw new DdlException("Invalid properties: " + 
METADATA_REFRESH_INTERVAL_SEC);
+                }
+                Integer[] sec = {metadataRefreshIntervalSec, 
metadataRefreshIntervalSec};
+                
Env.getCurrentEnv().getRefreshManager().registerIntoRefreshMap(catalogName, 
sec);

Review Comment:
   You also need to modify the method `gsonPostProcess()` in this class.
   To add all catalogs which need to refresh to the RefreshManger, or these 
refresh info will be lost.
   
   You can test this bug with following steps:
   1. create a catalog with refresh property.
   2. Restart FE, and wait for 2 minutes.
   3. Restart FE again, you will see that the catalog won't refresh anymore.
   



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -146,4 +164,36 @@ private void 
refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env env) thro
                 stmt.getTableName(), "ICEBERG", icebergProperties, "");
         env.createTable(createTableStmt);
     }
+
+    public void registerIntoRefreshMap(String catalogName, Integer[] sec) {
+        refreshMap.put(catalogName, sec);
+    }
+
+    public void logOutOfRefreshMap(String catalogName) {

Review Comment:
   ```suggestion
       public void removeFromRefreshMap(String catalogName) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -19,26 +19,44 @@
 
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.RefreshCatalogStmt;
 import org.apache.doris.analysis.RefreshDbStmt;
 import org.apache.doris.analysis.RefreshTableStmt;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.qe.DdlExecutor;
 
+import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 // Manager for refresh database and table action
-public class RefreshManager {
+public class RefreshManager implements Runnable {
     private static final Logger LOG = 
LogManager.getLogger(RefreshManager.class);
 
+    private static final ScheduledThreadPoolExecutor REFRESH_TIMER = 
ThreadPoolManager.newDaemonScheduledThreadPool(1,
+        "catalog-refresh-timer-pool", true);
+    // Unit:SECONDS
+    private static final int REFRESH_TIME = 5;
+    // key is the name of a catalog, value is an array of length 2, used to 
store
+    // the original refresh time and the current remaining time of the catalog
+    private Map<String, Integer[]> refreshMap = Maps.newConcurrentMap();

Review Comment:
   Use `catalog id` as the map's key.
   So that if catalog is renamed, no need to update this map.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java:
##########
@@ -123,6 +124,7 @@ private CatalogIf removeCatalog(long catalogId) {
         if (catalog != null) {
             catalog.onClose();
             nameToCatalog.remove(catalog.getName());
+            
Env.getCurrentEnv().getRefreshManager().logOutOfRefreshMap(catalog.getName());

Review Comment:
   Move this to `catalog.onClose()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to