This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 57cc34c1b [Improvement]: Implement a stateless overview manager.
(#3429)
57cc34c1b is described below
commit 57cc34c1bf24edac596ef3a343a2c2473ecd7498
Author: baiyangtx <[email protected]>
AuthorDate: Fri Feb 7 12:02:17 2025 +0800
[Improvement]: Implement a stateless overview manager. (#3429)
* table optimizing info
* table runtime meta
* Controller donot depend on tableService
* ci
* ci
* ci
* ci
* stateless overview
* stateless overview
* ci
* ci
---------
Co-authored-by: zhangyongxiang.alpha <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 12 +
.../amoro/server/dashboard/DashboardServer.java | 3 +-
.../amoro/server/dashboard/OverviewCache.java | 303 ---------------------
.../amoro/server/dashboard/OverviewManager.java | 274 +++++++++++++++++++
.../server/dashboard/OverviewMetricsReporter.java | 78 ------
.../dashboard/controller/OverviewController.java | 26 +-
.../dashboard/model/OverviewTopTableItem.java | 2 +-
.../persistence/mapper/CatalogMetaMapper.java | 3 +
.../org.apache.amoro.metrics.MetricReporter | 1 -
...OverviewCache.java => TestOverviewManager.java} | 69 +++--
dist/src/main/amoro-bin/conf/config.yaml | 4 +
.../amoro-bin/conf/plugins/metric-reporters.yaml | 6 -
12 files changed, 343 insertions(+), 438 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 04ac28d8e..02c6d938f 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -288,6 +288,18 @@ public class AmoroManagementConf {
.withDescription(
"The number of hours that self-optimizing runtime data expire
interval.");
+ public static final ConfigOption<Duration> OVERVIEW_CACHE_REFRESH_INTERVAL =
+ ConfigOptions.key("overview-cache.refresh-interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(180))
+ .withDescription("Interval for refreshing overview cache.");
+
+ public static final ConfigOption<Integer> OVERVIEW_CACHE_MAX_SIZE =
+ ConfigOptions.key("overview-cache.max-size")
+ .intType()
+ .defaultValue(3360)
+ .withDescription("Max size of overview cache.");
+
public static final ConfigOption<String> DB_TYPE =
ConfigOptions.key("database.type")
.stringType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
index d21c83813..acb13e55f 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
@@ -117,7 +117,8 @@ public class DashboardServer {
new TableController(catalogManager, tableManager, tableDescriptor,
serviceConfig);
this.terminalController = new TerminalController(terminalManager);
this.versionController = new VersionController();
- this.overviewController = new OverviewController();
+ OverviewManager manager = new OverviewManager(serviceConfig);
+ this.overviewController = new OverviewController(manager);
this.authType =
serviceConfig.get(AmoroManagementConf.HTTP_SERVER_REST_AUTH_TYPE);
this.basicAuthUser = serviceConfig.get(AmoroManagementConf.ADMIN_USERNAME);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewCache.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewCache.java
deleted file mode 100644
index 50994b541..000000000
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewCache.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.dashboard;
-
-import static
org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_COMMITTING_TABLES;
-import static
org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_EXECUTING_TABLES;
-import static
org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_IDLE_TABLES;
-import static
org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_MEMORY_BYTES_ALLOCATED;
-import static
org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_PENDING_TABLES;
-import static
org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_PLANING_TABLES;
-import static
org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_THREADS;
-import static
org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_HEALTH_SCORE;
-import static
org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_TOTAL_FILES;
-import static
org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_TOTAL_FILES_SIZE;
-
-import org.apache.amoro.metrics.Counter;
-import org.apache.amoro.metrics.Gauge;
-import org.apache.amoro.metrics.Metric;
-import org.apache.amoro.metrics.MetricDefine;
-import org.apache.amoro.metrics.MetricKey;
-import org.apache.amoro.metrics.MetricSet;
-import org.apache.amoro.server.dashboard.model.OverviewDataSizeItem;
-import org.apache.amoro.server.dashboard.model.OverviewResourceUsageItem;
-import org.apache.amoro.server.dashboard.model.OverviewTopTableItem;
-import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
-import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-public class OverviewCache {
-
- public static final String STATUS_PENDING = "Pending";
- public static final String STATUS_PLANING = "Planing";
- public static final String STATUS_EXECUTING = "Executing";
- public static final String STATUS_IDLE = "Idle";
- public static final String STATUS_COMMITTING = "Committing";
-
- private static final Logger LOG =
LoggerFactory.getLogger(OverviewCache.class);
-
- private volatile List<OverviewTopTableItem> allTopTableItem = new
ArrayList<>();
- private final Map<String, Long> optimizingStatusCountMap = new
ConcurrentHashMap<>();
- private final ConcurrentLinkedDeque<OverviewResourceUsageItem>
resourceUsageHistory =
- new ConcurrentLinkedDeque<>();
- private final ConcurrentLinkedDeque<OverviewDataSizeItem> dataSizeHistory =
- new ConcurrentLinkedDeque<>();
- private final AtomicInteger totalCatalog = new AtomicInteger();
- private final AtomicLong totalDataSize = new AtomicLong();
- private final AtomicInteger totalTableCount = new AtomicInteger();
- private final AtomicInteger totalCpu = new AtomicInteger();
- private final AtomicLong totalMemory = new AtomicLong();
-
- private MetricSet metricSet;
- private int maxRecordCount;
- private static volatile OverviewCache INSTANCE;
-
- private OverviewCache() {}
-
- /** @return Get the singleton object. */
- public static OverviewCache getInstance() {
- if (INSTANCE == null) {
- synchronized (OverviewCache.class) {
- if (INSTANCE == null) {
- INSTANCE = new OverviewCache();
- }
- }
- }
- return INSTANCE;
- }
-
- public void initialize(int maxRecordCount, MetricSet globalMetricSet) {
- this.maxRecordCount = maxRecordCount;
- this.metricSet = globalMetricSet;
- }
-
- public List<OverviewTopTableItem> getAllTopTableItem() {
- return ImmutableList.copyOf(allTopTableItem);
- }
-
- public int getTotalCatalog() {
- return totalCatalog.get();
- }
-
- public int getTotalTableCount() {
- return totalTableCount.get();
- }
-
- public long getTotalDataSize() {
- return totalDataSize.get();
- }
-
- public int getTotalCpu() {
- return totalCpu.get();
- }
-
- public long getTotalMemory() {
- return totalMemory.get();
- }
-
- public List<OverviewResourceUsageItem> getResourceUsageHistory(long
startTime) {
- return resourceUsageHistory.stream()
- .filter(item -> item.getTs() >= startTime)
- .collect(Collectors.toList());
- }
-
- public List<OverviewDataSizeItem> getDataSizeHistory(long startTime) {
- return dataSizeHistory.stream()
- .filter(item -> item.getTs() >= startTime)
- .collect(Collectors.toList());
- }
-
- public Map<String, Long> getOptimizingStatus() {
- return optimizingStatusCountMap;
- }
-
- public void refresh() {
- long start = System.currentTimeMillis();
- LOG.info("Updating overview cache");
- try {
- // Already registered metrics may change,
- // so the metricDefineMap needs to be recalculated at each refresh
- Map<MetricDefine, List<MetricKey>> metricDefineMap =
- metricSet.getMetrics().keySet().stream()
- .collect(
- Collectors.groupingBy(
- MetricKey::getDefine,
- Collectors.mapping(Function.identity(),
Collectors.toList())));
-
- updateResourceUsage(start, metricDefineMap);
- updateTableDetail(start, metricDefineMap);
- updateOptimizingStatus(metricDefineMap);
-
- } catch (Exception e) {
- LOG.error("OverviewRefresher error", e);
- } finally {
- long end = System.currentTimeMillis();
- LOG.info("Refresher overview cache took {} ms.", end - start);
- }
- }
-
- private void updateResourceUsage(long ts, Map<MetricDefine, List<MetricKey>>
metricDefineMap) {
- int optimizerGroupThreadCount =
- (int) sumMetricValuesByDefine(metricDefineMap,
OPTIMIZER_GROUP_THREADS);
- long optimizerGroupMemoryBytes =
- sumMetricValuesByDefine(metricDefineMap,
OPTIMIZER_GROUP_MEMORY_BYTES_ALLOCATED);
-
- this.totalCpu.set(optimizerGroupThreadCount);
- this.totalMemory.set(optimizerGroupMemoryBytes);
- addAndCheck(
- new OverviewResourceUsageItem(ts, optimizerGroupThreadCount,
optimizerGroupMemoryBytes));
- }
-
- private void updateTableDetail(long ts, Map<MetricDefine, List<MetricKey>>
metricDefineMap) {
- Map<String, OverviewTopTableItem> topTableItemMap = Maps.newHashMap();
- Set<String> allCatalogs = Sets.newHashSet();
- Map<MetricKey, Metric> registeredMetrics = metricSet.getMetrics();
- long totalTableSize = 0L;
-
- // table size
- List<MetricKey> metricKeys =
- metricDefineMap.getOrDefault(TABLE_SUMMARY_TOTAL_FILES_SIZE,
ImmutableList.of());
- for (MetricKey metricKey : metricKeys) {
- String tableName = fullTableName(metricKey);
- allCatalogs.add(catalog(metricKey));
- OverviewTopTableItem tableItem =
- topTableItemMap.computeIfAbsent(tableName, ignore -> new
OverviewTopTableItem(tableName));
- long tableSize = covertValue(registeredMetrics.get(metricKey));
- tableItem.setTableSize(tableSize);
- totalTableSize += tableSize;
- }
-
- // file count
- metricKeys = metricDefineMap.getOrDefault(TABLE_SUMMARY_TOTAL_FILES,
ImmutableList.of());
- for (MetricKey metricKey : metricKeys) {
- String tableName = fullTableName(metricKey);
- allCatalogs.add(catalog(metricKey));
- OverviewTopTableItem tableItem =
- topTableItemMap.computeIfAbsent(tableName, ignore -> new
OverviewTopTableItem(tableName));
- int fileCount = (int) covertValue(registeredMetrics.get(metricKey));
- tableItem.setFileCount(fileCount);
- tableItem.setAverageFileSize(fileCount == 0 ? 0 :
tableItem.getTableSize() / fileCount);
- }
-
- // health score
- metricKeys = metricDefineMap.getOrDefault(TABLE_SUMMARY_HEALTH_SCORE,
ImmutableList.of());
- for (MetricKey metricKey : metricKeys) {
- String tableName = fullTableName(metricKey);
- allCatalogs.add(catalog(metricKey));
- OverviewTopTableItem tableItem =
- topTableItemMap.computeIfAbsent(tableName, ignore -> new
OverviewTopTableItem(tableName));
- int healthScore = (int) covertValue(registeredMetrics.get(metricKey));
- tableItem.setHealthScore(healthScore);
- }
-
- this.totalDataSize.set(totalTableSize);
- this.totalCatalog.set(allCatalogs.size());
- this.totalTableCount.set(topTableItemMap.size());
- this.allTopTableItem = new ArrayList<>(topTableItemMap.values());
- addAndCheck(new OverviewDataSizeItem(ts, totalTableSize));
- }
-
- private void updateOptimizingStatus(Map<MetricDefine, List<MetricKey>>
metricDefineMap) {
- optimizingStatusCountMap.put(
- STATUS_PENDING, sumMetricValuesByDefine(metricDefineMap,
OPTIMIZER_GROUP_PENDING_TABLES));
- optimizingStatusCountMap.put(
- STATUS_PLANING, sumMetricValuesByDefine(metricDefineMap,
OPTIMIZER_GROUP_PLANING_TABLES));
- optimizingStatusCountMap.put(
- STATUS_EXECUTING,
- sumMetricValuesByDefine(metricDefineMap,
OPTIMIZER_GROUP_EXECUTING_TABLES));
- optimizingStatusCountMap.put(
- STATUS_IDLE, sumMetricValuesByDefine(metricDefineMap,
OPTIMIZER_GROUP_IDLE_TABLES));
- optimizingStatusCountMap.put(
- STATUS_COMMITTING,
- sumMetricValuesByDefine(metricDefineMap,
OPTIMIZER_GROUP_COMMITTING_TABLES));
- }
-
- private void addAndCheck(OverviewDataSizeItem dataSizeItem) {
- dataSizeHistory.add(dataSizeItem);
- checkSize(dataSizeHistory);
- }
-
- private void addAndCheck(OverviewResourceUsageItem resourceUsageItem) {
- resourceUsageHistory.add(resourceUsageItem);
- checkSize(resourceUsageHistory);
- }
-
- private <T> void checkSize(Deque<T> deque) {
- if (deque.size() > maxRecordCount) {
- deque.poll();
- }
- }
-
- private String fullTableName(MetricKey metricKey) {
- return catalog(metricKey)
- .concat(".")
- .concat(database(metricKey))
- .concat(".")
- .concat(table(metricKey));
- }
-
- private String catalog(MetricKey metricKey) {
- return metricKey.valueOfTag("catalog");
- }
-
- private String database(MetricKey metricKey) {
- return metricKey.valueOfTag("database");
- }
-
- private String table(MetricKey metricKey) {
- return metricKey.valueOfTag("table");
- }
-
- private long sumMetricValuesByDefine(
- Map<MetricDefine, List<MetricKey>> metricDefineMap, MetricDefine
metricDefine) {
- List<MetricKey> metricKeys = metricDefineMap.get(metricDefine);
- if ((metricKeys == null)) {
- return 0;
- }
- return metricKeys.stream()
- .map(metricKey -> covertValue(metricSet.getMetrics().get(metricKey)))
- .mapToLong(Long::longValue)
- .sum();
- }
-
- private long covertValue(Metric metric) {
- if (metric instanceof Counter) {
- return ((Counter) metric).getCount();
- } else if (metric instanceof Gauge) {
- return ((Gauge<?>) metric).getValue().longValue();
- } else {
- throw new IllegalStateException(
- "unknown metric implement class:" + metric.getClass().getName());
- }
- }
-}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewManager.java
new file mode 100644
index 000000000..3de5adabc
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewManager.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.dashboard;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.dashboard.model.OverviewDataSizeItem;
+import org.apache.amoro.server.dashboard.model.OverviewResourceUsageItem;
+import org.apache.amoro.server.dashboard.model.OverviewTopTableItem;
+import org.apache.amoro.server.optimizing.OptimizingStatus;
+import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
+import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.resource.OptimizerInstance;
+import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
+import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+public class OverviewManager extends PersistentBase {
+
+ public static final String STATUS_PENDING = "Pending";
+ public static final String STATUS_PLANING = "Planing";
+ public static final String STATUS_EXECUTING = "Executing";
+ public static final String STATUS_IDLE = "Idle";
+ public static final String STATUS_COMMITTING = "Committing";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OverviewManager.class);
+ private final List<OverviewTopTableItem> allTopTableItem = new ArrayList<>();
+ private final Map<String, Long> optimizingStatusCountMap = new
ConcurrentHashMap<>();
+ private final ConcurrentLinkedDeque<OverviewResourceUsageItem>
resourceUsageHistory =
+ new ConcurrentLinkedDeque<>();
+ private final ConcurrentLinkedDeque<OverviewDataSizeItem> dataSizeHistory =
+ new ConcurrentLinkedDeque<>();
+ private final AtomicInteger totalCatalog = new AtomicInteger();
+ private final AtomicLong totalDataSize = new AtomicLong();
+ private final AtomicInteger totalTableCount = new AtomicInteger();
+ private final AtomicInteger totalCpu = new AtomicInteger();
+ private final AtomicLong totalMemory = new AtomicLong();
+
+ private final int maxRecordCount;
+
+ public OverviewManager(Configurations serverConfigs) {
+ this(
+ serverConfigs.getInteger(AmoroManagementConf.OVERVIEW_CACHE_MAX_SIZE),
+
serverConfigs.get(AmoroManagementConf.OVERVIEW_CACHE_REFRESH_INTERVAL));
+ }
+
+ @VisibleForTesting
+ public OverviewManager(int maxRecordCount, Duration refreshInterval) {
+ this.maxRecordCount = maxRecordCount;
+ ScheduledExecutorService overviewUpdaterScheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("overview-refresh-scheduler-%d")
+ .setDaemon(true)
+ .build());
+ resetStatusMap();
+
+ if (refreshInterval.toMillis() > 0) {
+ overviewUpdaterScheduler.scheduleAtFixedRate(
+ this::refresh, 1000L, refreshInterval.toMillis(),
TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public List<OverviewTopTableItem> getAllTopTableItem() {
+ return ImmutableList.copyOf(allTopTableItem);
+ }
+
+ public int getTotalCatalog() {
+ return totalCatalog.get();
+ }
+
+ public int getTotalTableCount() {
+ return totalTableCount.get();
+ }
+
+ public long getTotalDataSize() {
+ return totalDataSize.get();
+ }
+
+ public int getTotalCpu() {
+ return totalCpu.get();
+ }
+
+ public long getTotalMemory() {
+ return totalMemory.get();
+ }
+
+ public List<OverviewResourceUsageItem> getResourceUsageHistory(long
startTime) {
+ return resourceUsageHistory.stream()
+ .filter(item -> item.getTs() >= startTime)
+ .collect(Collectors.toList());
+ }
+
+ public List<OverviewDataSizeItem> getDataSizeHistory(long startTime) {
+ return dataSizeHistory.stream()
+ .filter(item -> item.getTs() >= startTime)
+ .collect(Collectors.toList());
+ }
+
+ public Map<String, Long> getOptimizingStatus() {
+ return optimizingStatusCountMap;
+ }
+
+ @VisibleForTesting
+ public void refresh() {
+ long start = System.currentTimeMillis();
+ LOG.info("Refreshing overview cache");
+ try {
+ refreshTableCache(start);
+ refreshResourceUsage(start);
+
+ } catch (Exception e) {
+ LOG.error("Refreshed overview cache failed", e);
+ } finally {
+ long end = System.currentTimeMillis();
+ LOG.info("Refreshed overview cache in {} ms.", end - start);
+ }
+ }
+
+ private void refreshTableCache(long ts) {
+ int totalCatalogs = getAs(CatalogMetaMapper.class,
CatalogMetaMapper::selectCatalogCount);
+
+ List<TableRuntimeMeta> metas =
+ getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas);
+ AtomicLong totalDataSize = new AtomicLong();
+ AtomicInteger totalFileCounts = new AtomicInteger();
+ Map<String, OverviewTopTableItem> topTableItemMap = Maps.newHashMap();
+ Map<String, Long> optimizingStatusMap = Maps.newHashMap();
+ for (TableRuntimeMeta meta : metas) {
+ Optional<OverviewTopTableItem> optItem = toTopTableItem(meta);
+ optItem.ifPresent(
+ tableItem -> {
+ topTableItemMap.put(tableItem.getTableName(), tableItem);
+ totalDataSize.addAndGet(tableItem.getTableSize());
+ totalFileCounts.addAndGet(tableItem.getFileCount());
+ });
+ String status = statusToMetricString(meta.getTableStatus());
+ if (StringUtils.isNotEmpty(status)) {
+ optimizingStatusMap.putIfAbsent(status, 0L);
+ optimizingStatusMap.computeIfPresent(status, (k, v) -> v + 1);
+ }
+ }
+
+ this.totalCatalog.set(totalCatalogs);
+ this.totalTableCount.set(topTableItemMap.size());
+ this.totalDataSize.set(totalDataSize.get());
+ this.allTopTableItem.clear();
+ this.allTopTableItem.addAll(topTableItemMap.values());
+ addAndCheck(new OverviewDataSizeItem(ts, this.totalDataSize.get()));
+ resetStatusMap();
+ this.optimizingStatusCountMap.putAll(optimizingStatusMap);
+ }
+
+ private Optional<OverviewTopTableItem> toTopTableItem(TableRuntimeMeta meta)
{
+ if (meta == null) {
+ return Optional.empty();
+ }
+ OverviewTopTableItem tableItem = new
OverviewTopTableItem(fullTableName(meta));
+ if (meta.getPendingInput() != null) {
+ tableItem.setTableSize(meta.getPendingInput().getTotalFileSize());
+ tableItem.setFileCount(meta.getPendingInput().getTotalFileCount());
+ tableItem.setHealthScore(meta.getPendingInput().getHealthScore());
+ }
+ tableItem.setAverageFileSize(
+ tableItem.getFileCount() == 0 ? 0 : tableItem.getTableSize() /
tableItem.getFileCount());
+ return Optional.of(tableItem);
+ }
+
+ private String statusToMetricString(OptimizingStatus status) {
+ if (status == null) {
+ return null;
+ }
+ switch (status) {
+ case PENDING:
+ return STATUS_PENDING;
+ case PLANNING:
+ return STATUS_PLANING;
+ case MINOR_OPTIMIZING:
+ case MAJOR_OPTIMIZING:
+ case FULL_OPTIMIZING:
+ return STATUS_EXECUTING;
+ case IDLE:
+ return STATUS_IDLE;
+ case COMMITTING:
+ return STATUS_COMMITTING;
+ default:
+ return null;
+ }
+ }
+
+ private void resetStatusMap() {
+ optimizingStatusCountMap.clear();
+ optimizingStatusCountMap.put(STATUS_PENDING, 0L);
+ optimizingStatusCountMap.put(STATUS_PLANING, 0L);
+ optimizingStatusCountMap.put(STATUS_EXECUTING, 0L);
+ optimizingStatusCountMap.put(STATUS_IDLE, 0L);
+ optimizingStatusCountMap.put(STATUS_COMMITTING, 0L);
+ }
+
+ private void refreshResourceUsage(long ts) {
+ List<OptimizerInstance> instances = getAs(OptimizerMapper.class,
OptimizerMapper::selectAll);
+ AtomicInteger cpuCount = new AtomicInteger();
+ AtomicLong memoryBytes = new AtomicLong();
+ for (OptimizerInstance instance : instances) {
+ cpuCount.addAndGet(instance.getThreadCount());
+ memoryBytes.addAndGet(instance.getMemoryMb() * 1024L * 1024L);
+ }
+ this.totalCpu.set(cpuCount.get());
+ this.totalMemory.set(memoryBytes.get());
+ addAndCheck(new OverviewResourceUsageItem(ts, cpuCount.get(),
memoryBytes.get()));
+ }
+
+ private void addAndCheck(OverviewDataSizeItem dataSizeItem) {
+ dataSizeHistory.add(dataSizeItem);
+ checkSize(dataSizeHistory);
+ }
+
+ private void addAndCheck(OverviewResourceUsageItem resourceUsageItem) {
+ resourceUsageHistory.add(resourceUsageItem);
+ checkSize(resourceUsageHistory);
+ }
+
+ private <T> void checkSize(Deque<T> deque) {
+ if (deque.size() > maxRecordCount) {
+ deque.poll();
+ }
+ }
+
+ private String fullTableName(TableRuntimeMeta meta) {
+ return meta.getCatalogName()
+ .concat(".")
+ .concat(meta.getDbName())
+ .concat(".")
+ .concat(meta.getTableName());
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewMetricsReporter.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewMetricsReporter.java
deleted file mode 100644
index 42a9facdc..000000000
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewMetricsReporter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.dashboard;
-
-import org.apache.amoro.metrics.MetricReporter;
-import org.apache.amoro.metrics.MetricSet;
-import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/** Overview exporter */
-public class OverviewMetricsReporter implements MetricReporter {
-
- public static final String REFRESH_INTERVAL = "refresh-interval";
- public static final String MAX_HISTORY_RECORDS = "max-history-records";
-
- private final ScheduledExecutorService overviewUpdaterScheduler =
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat("overview-updater-scheduler-%d")
- .setDaemon(true)
- .build());
-
- private long overviewRefreshingInterval;
- private int maxRecordCount;
-
- @Override
- public void open(Map<String, String> properties) {
- overviewRefreshingInterval =
- Optional.ofNullable(properties.get(REFRESH_INTERVAL))
- .map(Long::valueOf)
- .orElseThrow(
- () -> new IllegalArgumentException("Lack required property: "
+ REFRESH_INTERVAL));
-
- maxRecordCount =
- Optional.ofNullable(properties.get(MAX_HISTORY_RECORDS))
- .map(Integer::valueOf)
- .orElseThrow(
- () ->
- new IllegalArgumentException("Lack required property: " +
MAX_HISTORY_RECORDS));
- }
-
- @Override
- public void close() {}
-
- @Override
- public String name() {
- return "overview-exporter";
- }
-
- @Override
- public void setGlobalMetricSet(MetricSet globalMetricSet) {
- OverviewCache overviewCache = OverviewCache.getInstance();
- overviewCache.initialize(maxRecordCount, globalMetricSet);
- overviewUpdaterScheduler.scheduleAtFixedRate(
- overviewCache::refresh, 1000L, overviewRefreshingInterval,
TimeUnit.MILLISECONDS);
- }
-}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OverviewController.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OverviewController.java
index 484cf42bb..e541de989 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OverviewController.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OverviewController.java
@@ -19,7 +19,7 @@
package org.apache.amoro.server.dashboard.controller;
import io.javalin.http.Context;
-import org.apache.amoro.server.dashboard.OverviewCache;
+import org.apache.amoro.server.dashboard.OverviewManager;
import org.apache.amoro.server.dashboard.model.OverviewDataSizeItem;
import org.apache.amoro.server.dashboard.model.OverviewResourceUsageItem;
import org.apache.amoro.server.dashboard.model.OverviewSummary;
@@ -38,17 +38,17 @@ import java.util.stream.Collectors;
/** The controller that handles overview page requests. */
public class OverviewController {
- private OverviewCache overviewCache;
+ private final OverviewManager manager;
- public OverviewController() {
- this.overviewCache = OverviewCache.getInstance();
+ public OverviewController(OverviewManager manager) {
+ this.manager = manager;
}
public void getResourceUsageHistory(Context ctx) {
String startTime = ctx.queryParam("startTime");
Preconditions.checkArgument(StringUtils.isNumeric(startTime), "invalid
startTime!");
List<OverviewResourceUsageItem> resourceUsageHistory =
- overviewCache.getResourceUsageHistory(Long.parseLong(startTime));
+ manager.getResourceUsageHistory(Long.parseLong(startTime));
ctx.json(OkResponse.of(resourceUsageHistory));
}
@@ -56,7 +56,7 @@ public class OverviewController {
String startTime = ctx.queryParam("startTime");
Preconditions.checkArgument(StringUtils.isNumeric(startTime), "invalid
startTime!");
List<OverviewDataSizeItem> dataSizeHistory =
- overviewCache.getDataSizeHistory(Long.parseLong(startTime));
+ manager.getDataSizeHistory(Long.parseLong(startTime));
ctx.json(OkResponse.of(dataSizeHistory));
}
@@ -98,7 +98,7 @@ public class OverviewController {
private List<OverviewTopTableItem> getTopTables(
boolean asc, Comparator<OverviewTopTableItem> comparator, int limit) {
- return overviewCache.getAllTopTableItem().stream()
+ return manager.getAllTopTableItem().stream()
.sorted(
asc
? comparator.thenComparing(OverviewTopTableItem::getTableName)
@@ -108,11 +108,11 @@ public class OverviewController {
}
public void getSummary(Context ctx) {
- int totalCatalog = overviewCache.getTotalCatalog();
- int totalTableCount = overviewCache.getTotalTableCount();
- long totalDataSize = overviewCache.getTotalDataSize();
- int totalCpu = overviewCache.getTotalCpu();
- long totalMemory = overviewCache.getTotalMemory();
+ int totalCatalog = manager.getTotalCatalog();
+ int totalTableCount = manager.getTotalTableCount();
+ long totalDataSize = manager.getTotalDataSize();
+ int totalCpu = manager.getTotalCpu();
+ long totalMemory = manager.getTotalMemory();
OverviewSummary overviewSummary =
new OverviewSummary(totalCatalog, totalTableCount, totalDataSize,
totalCpu, totalMemory);
@@ -120,7 +120,7 @@ public class OverviewController {
}
public void getOptimizingStatus(Context ctx) {
- Map<String, Long> optimizingStatus = overviewCache.getOptimizingStatus();
+ Map<String, Long> optimizingStatus = manager.getOptimizingStatus();
List<ImmutableMap<String, ? extends Serializable>> optimizingStatusList =
optimizingStatus.entrySet().stream()
.map(status -> ImmutableMap.of("name", status.getKey(), "value",
status.getValue()))
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/OverviewTopTableItem.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/OverviewTopTableItem.java
index 2fffc9b8e..64e5fa86c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/OverviewTopTableItem.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/OverviewTopTableItem.java
@@ -25,7 +25,7 @@ public class OverviewTopTableItem {
private long tableSize;
private int fileCount;
private long averageFileSize;
- private int healthScore;
+ private int healthScore = -1;
public OverviewTopTableItem() {}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java
index ee62866ae..4058002b2 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java
@@ -114,6 +114,9 @@ public interface CatalogMetaMapper {
@Select("SELECT database_count FROM " + TABLE_NAME + " WHERE catalog_name =
#{catalogName}")
Integer selectDatabaseCount(@Param("catalogName") String catalogName);
+ @Select("SELECT COUNT(*) FROM " + TABLE_NAME)
+ Integer selectCatalogCount();
+
@Update(
"UPDATE "
+ TABLE_NAME
diff --git
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.metrics.MetricReporter
b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.metrics.MetricReporter
index 57053ec93..01d705720 100644
---
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.metrics.MetricReporter
+++
b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.metrics.MetricReporter
@@ -16,4 +16,3 @@
# limitations under the License.
#
-org.apache.amoro.server.dashboard.OverviewMetricsReporter
\ No newline at end of file
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java
similarity index 62%
rename from
amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java
rename to
amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java
index 5ac3c85c3..acffaa942 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java
@@ -18,11 +18,11 @@
package org.apache.amoro.server.dashboard;
-import static
org.apache.amoro.server.dashboard.OverviewCache.STATUS_COMMITTING;
-import static org.apache.amoro.server.dashboard.OverviewCache.STATUS_EXECUTING;
-import static org.apache.amoro.server.dashboard.OverviewCache.STATUS_IDLE;
-import static org.apache.amoro.server.dashboard.OverviewCache.STATUS_PENDING;
-import static org.apache.amoro.server.dashboard.OverviewCache.STATUS_PLANING;
+import static
org.apache.amoro.server.dashboard.OverviewManager.STATUS_COMMITTING;
+import static
org.apache.amoro.server.dashboard.OverviewManager.STATUS_EXECUTING;
+import static org.apache.amoro.server.dashboard.OverviewManager.STATUS_IDLE;
+import static org.apache.amoro.server.dashboard.OverviewManager.STATUS_PENDING;
+import static org.apache.amoro.server.dashboard.OverviewManager.STATUS_PLANING;
import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.TableFormat;
@@ -31,7 +31,6 @@ import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.io.MixedDataTestHelpers;
import org.apache.amoro.server.dashboard.model.OverviewTopTableItem;
-import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.table.AMSTableTestBase;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.executor.TableRuntimeRefreshExecutor;
@@ -48,13 +47,14 @@ import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@RunWith(Parameterized.class)
-public class TestOverviewCache extends AMSTableTestBase {
+public class TestOverviewManager extends AMSTableTestBase {
- private OverviewCache overviewCache;
+ private OverviewManager overviewManager;
@Parameterized.Parameters(name = "{0}, {1}")
public static Object[] parameters() {
@@ -63,7 +63,7 @@ public class TestOverviewCache extends AMSTableTestBase {
};
}
- public TestOverviewCache(CatalogTestHelper catalogTestHelper,
TableTestHelper tableTestHelper) {
+ public TestOverviewManager(CatalogTestHelper catalogTestHelper,
TableTestHelper tableTestHelper) {
super(catalogTestHelper, tableTestHelper, false);
}
@@ -71,9 +71,8 @@ public class TestOverviewCache extends AMSTableTestBase {
public void prepare() {
createDatabase();
createTable();
- this.overviewCache = OverviewCache.getInstance();
- this.overviewCache.initialize(10,
MetricManager.getInstance().getGlobalRegistry());
- this.overviewCache.refresh();
+ this.overviewManager = new OverviewManager(10, Duration.ofMinutes(0));
+ this.overviewManager.refresh();
}
@After
@@ -117,38 +116,38 @@ public class TestOverviewCache extends AMSTableTestBase {
@Test
public void testOverviewCache() {
// empty table
- Assertions.assertEquals(1, overviewCache.getTotalCatalog());
- Assertions.assertEquals(1, overviewCache.getTotalTableCount());
- Assertions.assertEquals(0, overviewCache.getTotalDataSize());
- Assertions.assertEquals(0, overviewCache.getTotalCpu());
- Assertions.assertEquals(0, overviewCache.getTotalMemory());
-
- Assertions.assertEquals(0,
overviewCache.getOptimizingStatus().get(STATUS_PENDING));
- Assertions.assertEquals(0,
overviewCache.getOptimizingStatus().get(STATUS_COMMITTING));
- Assertions.assertEquals(0,
overviewCache.getOptimizingStatus().get(STATUS_EXECUTING));
- Assertions.assertEquals(0,
overviewCache.getOptimizingStatus().get(STATUS_PLANING));
- Assertions.assertEquals(1,
overviewCache.getOptimizingStatus().get(STATUS_IDLE));
-
- Assertions.assertEquals(1, overviewCache.getDataSizeHistory(0).size());
- Assertions.assertEquals(1,
overviewCache.getResourceUsageHistory(0).size());
-
- List<OverviewTopTableItem> allTopTableItem =
overviewCache.getAllTopTableItem();
+ Assertions.assertEquals(1, overviewManager.getTotalCatalog());
+ Assertions.assertEquals(1, overviewManager.getTotalTableCount());
+ Assertions.assertEquals(0, overviewManager.getTotalDataSize());
+ Assertions.assertEquals(0, overviewManager.getTotalCpu());
+ Assertions.assertEquals(0, overviewManager.getTotalMemory());
+
+ Assertions.assertEquals(0,
overviewManager.getOptimizingStatus().get(STATUS_PENDING));
+ Assertions.assertEquals(0,
overviewManager.getOptimizingStatus().get(STATUS_COMMITTING));
+ Assertions.assertEquals(0,
overviewManager.getOptimizingStatus().get(STATUS_EXECUTING));
+ Assertions.assertEquals(0,
overviewManager.getOptimizingStatus().get(STATUS_PLANING));
+ Assertions.assertEquals(1,
overviewManager.getOptimizingStatus().get(STATUS_IDLE));
+
+ Assertions.assertEquals(1, overviewManager.getDataSizeHistory(0).size());
+ Assertions.assertEquals(1,
overviewManager.getResourceUsageHistory(0).size());
+
+ List<OverviewTopTableItem> allTopTableItem =
overviewManager.getAllTopTableItem();
Assertions.assertEquals(1, allTopTableItem.size());
Assertions.assertEquals(-1, allTopTableItem.get(0).getHealthScore());
// insert data
initTableWithFiles();
refreshPending();
- overviewCache.refresh();
+ overviewManager.refresh();
- Assertions.assertTrue(overviewCache.getTotalDataSize() > 0);
+ Assertions.assertTrue(overviewManager.getTotalDataSize() > 0);
- Assertions.assertEquals(1,
overviewCache.getOptimizingStatus().get(STATUS_PENDING));
- Assertions.assertEquals(0,
overviewCache.getOptimizingStatus().get(STATUS_IDLE));
+ Assertions.assertEquals(1,
overviewManager.getOptimizingStatus().get(STATUS_PENDING));
+ Assertions.assertEquals(0,
overviewManager.getOptimizingStatus().get(STATUS_IDLE));
- Assertions.assertEquals(2, overviewCache.getDataSizeHistory(0).size());
- Assertions.assertEquals(2,
overviewCache.getResourceUsageHistory(0).size());
- allTopTableItem = overviewCache.getAllTopTableItem();
+ Assertions.assertEquals(2, overviewManager.getDataSizeHistory(0).size());
+ Assertions.assertEquals(2,
overviewManager.getResourceUsageHistory(0).size());
+ allTopTableItem = overviewManager.getAllTopTableItem();
Assertions.assertEquals(100, allTopTableItem.get(0).getHealthScore());
}
}
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index f3e879601..a4d6b29f2 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -99,6 +99,10 @@ ams:
identifier: default # Built-in support for default/base64. Defaults to
"default", indicating no encryption
sensitive-keywords: admin-password;database.password
+ overview-cache:
+ refresh-interval: 3m # 3 min
+ max-size: 3360 # Keep 7 days history by default, 7 * 24 *
60 / 3 = 3360
+
database:
type: derby
jdbc-driver-class: org.apache.derby.jdbc.EmbeddedDriver
diff --git a/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml
b/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml
index 75727df47..40842e817 100644
--- a/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml
@@ -19,12 +19,6 @@
# configurations of metric reporters
metric-reporters:
- - name: overview-exporter # configs for overview exporter
- enabled: true # if false, overview page will not
fetch data
- properties:
- refresh-interval: 180000 # 3 min
- max-history-records: 3360 # Keep 7 days history by default, 7
* 24 * 60 / 3 = 3360
-
# - name: prometheus-exporter # configs for prometheus exporter
# enabled: false
# properties: