morningman commented on code in PR #17884:
URL: https://github.com/apache/doris/pull/17884#discussion_r1157399902
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -146,4 +164,36 @@ private void
refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env env) thro
stmt.getTableName(), "ICEBERG", icebergProperties, "");
env.createTable(createTableStmt);
}
+
+ public void registerIntoRefreshMap(String catalogName, Integer[] sec) {
Review Comment:
```suggestion
public void addToRefreshMap(String catalogName, Integer[] sec) {
```
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java:
##########
@@ -462,6 +464,20 @@ public CatalogIf replayCreateCatalog(CatalogLog log,
boolean isReplay) throws Dd
writeLock();
try {
CatalogIf catalog = CatalogFactory.constructorFromLog(log);
+ Map<String, String> props = log.getProps();
+ if (props.containsKey(METADATA_REFRESH_INTERVAL_SEC)) {
Review Comment:
This should be done after `((ExternalCatalog) catalog).checkProperties()`.
And all properties, including `METADATA_REFRESH_INTERVAL_SEC`, should be
checked in `checkProperties()`.
After `checkProperties()`, all properties should be valid.
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -146,4 +164,36 @@ private void
refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env env) thro
stmt.getTableName(), "ICEBERG", icebergProperties, "");
env.createTable(createTableStmt);
}
+
+ public void registerIntoRefreshMap(String catalogName, Integer[] sec) {
+ refreshMap.put(catalogName, sec);
+ }
+
+ public void logOutOfRefreshMap(String catalogName) {
+ refreshMap.remove(catalogName);
+ }
+
+ @Override
+ public void run() {
+ for (Map.Entry<String, Integer[]> entry : refreshMap.entrySet()) {
+ String catalogName = entry.getKey();
+ Integer[] timeGroup = entry.getValue();
+ Integer original = timeGroup[0];
+ Integer current = timeGroup[1];
+ if (current - REFRESH_TIME > 0) {
+ timeGroup[1] = current - REFRESH_TIME;
+ refreshMap.put(catalogName, timeGroup);
+ } else {
+ RefreshCatalogStmt refreshCatalogStmt = new
RefreshCatalogStmt(catalogName, null);
+ try {
+ DdlExecutor.execute(Env.getCurrentEnv(),
refreshCatalogStmt);
+ } catch (Exception e) {
+ e.printStackTrace();
Review Comment:
```suggestion
LOG.warn("failed to refresh catalog {}", catalogName, e);
```
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java:
##########
@@ -462,6 +464,20 @@ public CatalogIf replayCreateCatalog(CatalogLog log,
boolean isReplay) throws Dd
writeLock();
try {
CatalogIf catalog = CatalogFactory.constructorFromLog(log);
+ Map<String, String> props = log.getProps();
+ if (props.containsKey(METADATA_REFRESH_INTERVAL_SEC)) {
+ // need refresh
+ String catalogName = log.getCatalogName();
+ Integer metadataRefreshIntervalSec = null;
+ try {
+ metadataRefreshIntervalSec =
Integer.valueOf(props.get(METADATA_REFRESH_INTERVAL_SEC));
+ } catch (NumberFormatException e) {
+ throw new DdlException("Invalid properties: " +
METADATA_REFRESH_INTERVAL_SEC);
+ }
+ Integer[] sec = {metadataRefreshIntervalSec,
metadataRefreshIntervalSec};
+
Env.getCurrentEnv().getRefreshManager().registerIntoRefreshMap(catalogName,
sec);
Review Comment:
You also need to modify the method `gsonPostProcess()` in this class.
To add all catalogs which need to refresh to the RefreshManger, or these
refresh info will be lost.
You can test this bug with following steps:
1. create a catalog with refresh property.
2. Restart FE, and wait for 2 minutes.
3. Restart FE again, you will see that the catalog won't refresh anymore.
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -146,4 +164,36 @@ private void
refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env env) thro
stmt.getTableName(), "ICEBERG", icebergProperties, "");
env.createTable(createTableStmt);
}
+
+ public void registerIntoRefreshMap(String catalogName, Integer[] sec) {
+ refreshMap.put(catalogName, sec);
+ }
+
+ public void logOutOfRefreshMap(String catalogName) {
Review Comment:
```suggestion
public void removeFromRefreshMap(String catalogName) {
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -19,26 +19,44 @@
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.RefreshCatalogStmt;
import org.apache.doris.analysis.RefreshDbStmt;
import org.apache.doris.analysis.RefreshTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.qe.DdlExecutor;
+import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
// Manager for refresh database and table action
-public class RefreshManager {
+public class RefreshManager implements Runnable {
private static final Logger LOG =
LogManager.getLogger(RefreshManager.class);
+ private static final ScheduledThreadPoolExecutor REFRESH_TIMER =
ThreadPoolManager.newDaemonScheduledThreadPool(1,
+ "catalog-refresh-timer-pool", true);
+ // Unit:SECONDS
+ private static final int REFRESH_TIME = 5;
+ // key is the name of a catalog, value is an array of length 2, used to
store
+ // the original refresh time and the current remaining time of the catalog
+ private Map<String, Integer[]> refreshMap = Maps.newConcurrentMap();
Review Comment:
Use `catalog id` as the map's key.
So that if catalog is renamed, no need to update this map.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java:
##########
@@ -123,6 +124,7 @@ private CatalogIf removeCatalog(long catalogId) {
if (catalog != null) {
catalog.onClose();
nameToCatalog.remove(catalog.getName());
+
Env.getCurrentEnv().getRefreshManager().logOutOfRefreshMap(catalog.getName());
Review Comment:
Move this to `catalog.onClose()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]