This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f951ca2efb331bafaf22a79de8ce8d78528d23d5 Author: Jibing-Li <[email protected]> AuthorDate: Mon Feb 26 10:48:48 2024 +0800 [refactor](stats) Remove useless async loader code. (#31380) --- .../doris/statistics/BasicAsyncCacheLoader.java | 48 ---------------------- .../statistics/ColumnStatisticsCacheLoader.java | 9 ---- .../apache/doris/statistics/StatisticsCache.java | 16 -------- .../org/apache/doris/statistics/CacheTest.java | 2 +- 4 files changed, 1 insertion(+), 74 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java index e7e488a6e7f..ac5896bb06c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java @@ -22,26 +22,17 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.checkerframework.checker.nullness.qual.NonNull; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; public abstract class BasicAsyncCacheLoader<K, V> implements AsyncCacheLoader<K, V> { private static final Logger LOG = LogManager.getLogger(BasicAsyncCacheLoader.class); - private final Map<K, CompletableFutureWithCreateTime<V>> inProgressing = new HashMap<>(); - @Override public @NonNull CompletableFuture<V> asyncLoad( @NonNull K key, @NonNull Executor executor) { - CompletableFutureWithCreateTime<V> cfWrapper = inProgressing.get(key); - if (cfWrapper != null) { - return cfWrapper.cf; - } CompletableFuture<V> future = CompletableFuture.supplyAsync(() -> { long startTime = System.currentTimeMillis(); try { @@ -49,49 +40,10 @@ public abstract class BasicAsyncCacheLoader<K, V> implements AsyncCacheLoader<K, } finally { long endTime = System.currentTimeMillis(); LOG.info("Load statistic cache [{}] cost time ms:{}", key, endTime - startTime); - removeFromIProgressing(key); } }, executor); - putIntoIProgressing(key, - new CompletableFutureWithCreateTime<V>(System.currentTimeMillis(), future)); return future; } protected abstract V doLoad(K k); - - private static class CompletableFutureWithCreateTime<V> extends CompletableFuture<V> { - - public final long startTime; - public final CompletableFuture<V> cf; - private final long expiredTimeMilli = TimeUnit.MINUTES.toMillis(30); - - public CompletableFutureWithCreateTime(long startTime, CompletableFuture<V> cf) { - this.startTime = startTime; - this.cf = cf; - } - - public boolean isExpired() { - return System.currentTimeMillis() - startTime > expiredTimeMilli; - } - } - - private void putIntoIProgressing(K k, CompletableFutureWithCreateTime<V> v) { - synchronized (inProgressing) { - inProgressing.put(k, v); - } - } - - private void removeFromIProgressing(K k) { - synchronized (inProgressing) { - inProgressing.remove(k); - } - } - - public void removeExpiredInProgressing() { - // Quite simple logic that would complete very fast. - // Lock on object to avoid ConcurrentModificationException. - synchronized (inProgressing) { - inProgressing.entrySet().removeIf(e -> e.getValue().isExpired()); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java index e33cff3107a..a281f9b0ece 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java @@ -18,7 +18,6 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.qe.InternalQueryExecutionException; import org.apache.doris.statistics.util.StatisticsUtil; @@ -27,19 +26,11 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Optional; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy; public class ColumnStatisticsCacheLoader extends BasicAsyncCacheLoader<StatisticsCacheKey, Optional<ColumnStatistic>> { private static final Logger LOG = LogManager.getLogger(ColumnStatisticsCacheLoader.class); - private static final ThreadPoolExecutor singleThreadPool = ThreadPoolManager.newDaemonFixedThreadPool( - StatisticConstants.RETRY_LOAD_THREAD_POOL_SIZE, - StatisticConstants.RETRY_LOAD_QUEUE_SIZE, "STATS_RELOAD", - true, - new DiscardOldestPolicy()); - @Override protected Optional<ColumnStatistic> doLoad(StatisticsCacheKey key) { Optional<ColumnStatistic> columnStatistic = Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index 20f25eb3e96..62e11f5c9d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -44,7 +44,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; public class StatisticsCache { @@ -75,21 +74,6 @@ public class StatisticsCache { .executor(threadPool) .buildAsync(histogramCacheLoader); - { - threadPool.submit(() -> { - while (true) { - try { - columnStatisticsCacheLoader.removeExpiredInProgressing(); - histogramCacheLoader.removeExpiredInProgressing(); - } catch (Throwable t) { - // IGNORE - } - Thread.sleep(TimeUnit.MINUTES.toMillis(15)); - } - - }); - } - public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long tblId, long idxId, String colName) { ConnectContext ctx = ConnectContext.get(); if (ctx != null && ctx.getSessionVariable().internalSession) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java index 00d233ff54b..f23b93624b4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java @@ -266,7 +266,7 @@ public class CacheTest extends TestWithFeService { Assertions.assertEquals(6, columnStatistic.minValue); Assertions.assertEquals(7, columnStatistic.maxValue); } else { - System.out.println("Cached is not loaded, skip test."); + System.out.println("Cache is not loaded, skip test."); } } catch (Throwable t) { t.printStackTrace(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
