morningman commented on code in PR #17884:
URL: https://github.com/apache/doris/pull/17884#discussion_r1160385665
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -19,25 +19,38 @@
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.RefreshCatalogStmt;
import org.apache.doris.analysis.RefreshDbStmt;
import org.apache.doris.analysis.RefreshTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.qe.DdlExecutor;
+import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
// Manager for refresh database and table action
public class RefreshManager {
private static final Logger LOG =
LogManager.getLogger(RefreshManager.class);
+ private ScheduledThreadPoolExecutor refreshScheduler =
ThreadPoolManager.newDaemonScheduledThreadPool(1,
+ "catalog-refresh-timer-pool", true);
+ // Unit:SECONDS
+ private static final int REFRESH_TIME = 5;
Review Comment:
```suggestion
private static final int REFRESH_TIME_SEC = 5;
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -19,25 +19,38 @@
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.RefreshCatalogStmt;
import org.apache.doris.analysis.RefreshDbStmt;
import org.apache.doris.analysis.RefreshTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.qe.DdlExecutor;
+import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
// Manager for refresh database and table action
public class RefreshManager {
private static final Logger LOG =
LogManager.getLogger(RefreshManager.class);
+ private ScheduledThreadPoolExecutor refreshScheduler =
ThreadPoolManager.newDaemonScheduledThreadPool(1,
+ "catalog-refresh-timer-pool", true);
+ // Unit:SECONDS
+ private static final int REFRESH_TIME = 5;
+ // key is the name of a catalog, value is an array of length 2, used to
store
Review Comment:
```suggestion
// key is the id of a catalog, value is an array of length 2, used to
store
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -146,4 +159,45 @@ private void
refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env env) thro
stmt.getTableName(), "ICEBERG", icebergProperties, "");
env.createTable(createTableStmt);
}
+
+ public void addToRefreshMap(long catalogId, Integer[] sec) {
+ refreshMap.put(catalogId, sec);
+ }
+
+ public void removeFromRefreshMap(long catalogId) {
+ refreshMap.remove(catalogId);
+ }
+
+ public void start() {
+ TaskRefresh taskRefresh = new TaskRefresh();
+ this.refreshScheduler.scheduleAtFixedRate(taskRefresh, 0, REFRESH_TIME,
+ TimeUnit.SECONDS);
+ }
+
+ private class TaskRefresh implements Runnable {
Review Comment:
```suggestion
private class RefreshTask implements Runnable {
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java:
##########
@@ -146,4 +159,45 @@ private void
refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env env) thro
stmt.getTableName(), "ICEBERG", icebergProperties, "");
env.createTable(createTableStmt);
}
+
+ public void addToRefreshMap(long catalogId, Integer[] sec) {
+ refreshMap.put(catalogId, sec);
+ }
+
+ public void removeFromRefreshMap(long catalogId) {
+ refreshMap.remove(catalogId);
+ }
+
+ public void start() {
+ TaskRefresh taskRefresh = new TaskRefresh();
+ this.refreshScheduler.scheduleAtFixedRate(taskRefresh, 0, REFRESH_TIME,
+ TimeUnit.SECONDS);
+ }
+
+ private class TaskRefresh implements Runnable {
+ @Override
+ public void run() {
+ for (Map.Entry<Long, Integer[]> entry : refreshMap.entrySet()) {
+ Long catalogId = entry.getKey();
+ Integer[] timeGroup = entry.getValue();
+ Integer original = timeGroup[0];
+ Integer current = timeGroup[1];
+ if (current - REFRESH_TIME > 0) {
+ timeGroup[1] = current - REFRESH_TIME;
+ refreshMap.put(catalogId, timeGroup);
+ } else {
+ String catalogName =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId).getName();
Review Comment:
Catalog may be dropped while you are doing this refresh task.
So need to check if the return value of `getCatalog` is null, or just using
`getCatalogOrException` and catch the exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]