This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 967ad8300f4 [refactor](stats) Remove useless async loader code.
(#31380) (#31464)
967ad8300f4 is described below
commit 967ad8300f42abee1582d4352ea7e8a8f9cbbd17
Author: Jibing-Li <[email protected]>
AuthorDate: Tue Feb 27 19:47:46 2024 +0800
[refactor](stats) Remove useless async loader code. (#31380) (#31464)
---
.../statistics/ColumnStatisticsCacheLoader.java | 9 ----
.../apache/doris/statistics/StatisticsCache.java | 16 -------
.../doris/statistics/StatisticsCacheLoader.java | 51 +---------------------
.../org/apache/doris/statistics/CacheTest.java | 2 +-
4 files changed, 2 insertions(+), 76 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java
index eda3645fd00..0b66fa5e7b1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java
@@ -18,7 +18,6 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.qe.InternalQueryExecutionException;
import org.apache.doris.statistics.util.StatisticsUtil;
@@ -27,19 +26,11 @@ import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;
public class ColumnStatisticsCacheLoader extends
StatisticsCacheLoader<Optional<ColumnStatistic>> {
private static final Logger LOG =
LogManager.getLogger(ColumnStatisticsCacheLoader.class);
- private static final ThreadPoolExecutor singleThreadPool =
ThreadPoolManager.newDaemonFixedThreadPool(
- StatisticConstants.RETRY_LOAD_THREAD_POOL_SIZE,
- StatisticConstants.RETRY_LOAD_QUEUE_SIZE, "STATS_RELOAD",
- true,
- new DiscardOldestPolicy());
-
@Override
protected Optional<ColumnStatistic> doLoad(StatisticsCacheKey key) {
Optional<ColumnStatistic> columnStatistic = Optional.empty();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
index 20f25eb3e96..62e11f5c9d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
@@ -44,7 +44,6 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
public class StatisticsCache {
@@ -75,21 +74,6 @@ public class StatisticsCache {
.executor(threadPool)
.buildAsync(histogramCacheLoader);
- {
- threadPool.submit(() -> {
- while (true) {
- try {
- columnStatisticsCacheLoader.removeExpiredInProgressing();
- histogramCacheLoader.removeExpiredInProgressing();
- } catch (Throwable t) {
- // IGNORE
- }
- Thread.sleep(TimeUnit.MINUTES.toMillis(15));
- }
-
- });
- }
-
public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long
tblId, long idxId, String colName) {
ConnectContext ctx = ConnectContext.get();
if (ctx != null && ctx.getSessionVariable().internalSession) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
index c212851a284..8b49c57f1bb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
@@ -22,77 +22,28 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
public abstract class StatisticsCacheLoader<V> implements
AsyncCacheLoader<StatisticsCacheKey, V> {
private static final Logger LOG =
LogManager.getLogger(StatisticsCacheLoader.class);
- private final Map<StatisticsCacheKey, CompletableFutureWithCreateTime<V>>
inProgressing = new HashMap<>();
-
@Override
public @NonNull CompletableFuture<V> asyncLoad(
@NonNull StatisticsCacheKey key,
@NonNull Executor executor) {
- CompletableFutureWithCreateTime<V> cfWrapper = inProgressing.get(key);
- if (cfWrapper != null) {
- return cfWrapper.cf;
- }
CompletableFuture<V> future = CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
try {
return doLoad(key);
} finally {
long endTime = System.currentTimeMillis();
- LOG.info("Query BE for column stats:{}-{} end time:{} cost
time:{}", key.tableId, key.colName,
- endTime, endTime - startTime);
- removeFromIProgressing(key);
+ LOG.info("Load statistic cache [{}] cost time ms:{}", key,
endTime - startTime);
}
}, executor);
- putIntoIProgressing(key,
- new
CompletableFutureWithCreateTime<V>(System.currentTimeMillis(), future));
return future;
}
protected abstract V doLoad(StatisticsCacheKey k);
-
- private static class CompletableFutureWithCreateTime<V> extends
CompletableFuture<V> {
-
- public final long startTime;
- public final CompletableFuture<V> cf;
- private final long expiredTimeMilli = TimeUnit.MINUTES.toMillis(30);
-
- public CompletableFutureWithCreateTime(long startTime,
CompletableFuture<V> cf) {
- this.startTime = startTime;
- this.cf = cf;
- }
-
- public boolean isExpired() {
- return System.currentTimeMillis() - startTime > expiredTimeMilli;
- }
- }
-
- private void putIntoIProgressing(StatisticsCacheKey k,
CompletableFutureWithCreateTime<V> v) {
- synchronized (inProgressing) {
- inProgressing.put(k, v);
- }
- }
-
- private void removeFromIProgressing(StatisticsCacheKey k) {
- synchronized (inProgressing) {
- inProgressing.remove(k);
- }
- }
-
- public void removeExpiredInProgressing() {
- // Quite simple logic that would complete very fast.
- // Lock on object to avoid ConcurrentModificationException.
- synchronized (inProgressing) {
- inProgressing.entrySet().removeIf(e -> e.getValue().isExpired());
- }
- }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
index e3a14ecfc5f..bba6cd0c590 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
@@ -289,7 +289,7 @@ public class CacheTest extends TestWithFeService {
Assertions.assertEquals(6, columnStatistic.minValue);
Assertions.assertEquals(7, columnStatistic.maxValue);
} else {
- System.out.println("Cached is not loaded, skip test.");
+ System.out.println("Cache is not loaded, skip test.");
}
} catch (Throwable t) {
t.printStackTrace();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]