This is an automated email from the ASF dual-hosted git repository.
englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dc7b2015f5 eh (#18122)
dc7b2015f5 is described below
commit dc7b2015f535f08414ae82c8b2d1c8701371f6aa
Author: AKIRA <[email protected]>
AuthorDate: Mon Mar 27 12:09:35 2023 +0900
eh (#18122)
---
.../apache/doris/statistics/StatisticsCache.java | 32 ++++++++++-
.../doris/statistics/StatisticsCacheLoader.java | 62 +++++++++++++++++-----
2 files changed, 81 insertions(+), 13 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
index c6486459a6..9add3c1bc3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
@@ -17,6 +17,9 @@
package org.apache.doris.statistics;
+//import org.apache.doris.common.ThreadPoolManager;
+
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.qe.ConnectContext;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
@@ -26,16 +29,43 @@ import org.apache.logging.log4j.Logger;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class StatisticsCache {
private static final Logger LOG =
LogManager.getLogger(StatisticsCache.class);
+ /**
+ * Use a standalone thread pool to avoid interference between this and any
other jdk function
+ * that use the thread of ForkJoinPool#common in the system.
+ */
+ private final ThreadPoolExecutor threadPool
+ = ThreadPoolManager.newDaemonFixedThreadPool(
+ 10, Integer.MAX_VALUE, "STATS_FETCH", true);
+
+ private final StatisticsCacheLoader cacheLoader = new
StatisticsCacheLoader();
+
private final AsyncLoadingCache<StatisticsCacheKey,
ColumnLevelStatisticCache> cache = Caffeine.newBuilder()
.maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
.expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
.refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
- .buildAsync(new StatisticsCacheLoader());
+ .executor(threadPool)
+ .buildAsync(cacheLoader);
+
+ {
+ threadPool.submit(() -> {
+ while (true) {
+ try {
+ cacheLoader.removeExpiredInProgressing();
+ Thread.sleep(TimeUnit.MINUTES.toMillis(15));
+ } catch (Throwable t) {
+ // IGNORE
+ }
+ }
+
+ });
+ }
public ColumnStatistic getColumnStatistics(long tblId, String colName) {
ColumnLevelStatisticCache columnLevelStatisticCache =
getColumnStatistics(tblId, -1, colName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
index c592a9b4eb..3417356b78 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
@@ -33,9 +33,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
public class StatisticsCacheLoader implements
AsyncCacheLoader<StatisticsCacheKey, ColumnLevelStatisticCache> {
@@ -51,18 +50,17 @@ public class StatisticsCacheLoader implements
AsyncCacheLoader<StatisticsCacheKe
// TODO: Maybe we should trigger a analyze job when the required
ColumnStatistic doesn't exists.
- private final ConcurrentMap<StatisticsCacheKey,
CompletableFuture<ColumnLevelStatisticCache>>
- inProgressing = new ConcurrentHashMap<>();
+ private final Map<StatisticsCacheKey, CompletableFutureWithCreateTime>
+ inProgressing = new HashMap<>();
@Override
public @NonNull CompletableFuture<ColumnLevelStatisticCache>
asyncLoad(@NonNull StatisticsCacheKey key,
@NonNull Executor executor) {
-
- CompletableFuture<ColumnLevelStatisticCache> future =
inProgressing.get(key);
- if (future != null) {
- return future;
+ CompletableFutureWithCreateTime cfWrapper = inProgressing.get(key);
+ if (cfWrapper != null) {
+ return cfWrapper.cf;
}
- future = CompletableFuture.supplyAsync(() -> {
+ CompletableFuture<ColumnLevelStatisticCache> future =
CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
try {
LOG.info("Query BE for column stats:{}-{} start time:{}",
key.tableId, key.colName,
@@ -107,10 +105,50 @@ public class StatisticsCacheLoader implements
AsyncCacheLoader<StatisticsCacheKe
long endTime = System.currentTimeMillis();
LOG.info("Query BE for column stats:{}-{} end time:{} cost
time:{}", key.tableId, key.colName,
endTime, endTime - startTime);
- inProgressing.remove(key);
+ removeFromIProgressing(key);
}
- });
- inProgressing.put(key, future);
+ }, executor);
+ putIntoIProgressing(key, new
CompletableFutureWithCreateTime(System.currentTimeMillis(), future));
return future;
}
+
+ private void putIntoIProgressing(StatisticsCacheKey k,
CompletableFutureWithCreateTime v) {
+ synchronized (inProgressing) {
+ inProgressing.put(k, v);
+ }
+ }
+
+ private void removeFromIProgressing(StatisticsCacheKey k) {
+ synchronized (inProgressing) {
+ inProgressing.remove(k);
+ }
+ }
+
+ public void removeExpiredInProgressing() {
+ // Quite simple logic that would complete very fast.
+ // Lock on object to avoid ConcurrentModificationException.
+ synchronized (inProgressing) {
+ inProgressing.entrySet().removeIf(e -> e.getValue().isExpired());
+ }
+ }
+
+ /**
+ * To make sure any item in the inProgressing would finally be removed to
avoid potential mem leak.
+ */
+ private static class CompletableFutureWithCreateTime extends
CompletableFuture<ColumnLevelStatisticCache> {
+
+ private static final long EXPIRED_TIME_MILLI =
TimeUnit.MINUTES.toMillis(30);
+
+ public final long startTime;
+ public final CompletableFuture<ColumnLevelStatisticCache> cf;
+
+ public CompletableFutureWithCreateTime(long startTime,
CompletableFuture<ColumnLevelStatisticCache> cf) {
+ this.startTime = startTime;
+ this.cf = cf;
+ }
+
+ public boolean isExpired() {
+ return System.currentTimeMillis() - startTime > EXPIRED_TIME_MILLI;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]