This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f951ca2efb331bafaf22a79de8ce8d78528d23d5
Author: Jibing-Li <[email protected]>
AuthorDate: Mon Feb 26 10:48:48 2024 +0800

    [refactor](stats) Remove useless async loader code. (#31380)
---
 .../doris/statistics/BasicAsyncCacheLoader.java    | 48 ----------------------
 .../statistics/ColumnStatisticsCacheLoader.java    |  9 ----
 .../apache/doris/statistics/StatisticsCache.java   | 16 --------
 .../org/apache/doris/statistics/CacheTest.java     |  2 +-
 4 files changed, 1 insertion(+), 74 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java
index e7e488a6e7f..ac5896bb06c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java
@@ -22,26 +22,17 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.checkerframework.checker.nullness.qual.NonNull;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 
 public abstract class BasicAsyncCacheLoader<K, V> implements 
AsyncCacheLoader<K, V> {
 
     private static final Logger LOG = 
LogManager.getLogger(BasicAsyncCacheLoader.class);
 
-    private final Map<K, CompletableFutureWithCreateTime<V>> inProgressing = 
new HashMap<>();
-
     @Override
     public @NonNull CompletableFuture<V> asyncLoad(
             @NonNull K key,
             @NonNull Executor executor) {
-        CompletableFutureWithCreateTime<V> cfWrapper = inProgressing.get(key);
-        if (cfWrapper != null) {
-            return cfWrapper.cf;
-        }
         CompletableFuture<V> future = CompletableFuture.supplyAsync(() -> {
             long startTime = System.currentTimeMillis();
             try {
@@ -49,49 +40,10 @@ public abstract class BasicAsyncCacheLoader<K, V> 
implements AsyncCacheLoader<K,
             } finally {
                 long endTime = System.currentTimeMillis();
                 LOG.info("Load statistic cache [{}] cost time ms:{}", key, 
endTime - startTime);
-                removeFromIProgressing(key);
             }
         }, executor);
-        putIntoIProgressing(key,
-                new 
CompletableFutureWithCreateTime<V>(System.currentTimeMillis(), future));
         return future;
     }
 
     protected abstract V doLoad(K k);
-
-    private static class CompletableFutureWithCreateTime<V> extends 
CompletableFuture<V> {
-
-        public final long startTime;
-        public final CompletableFuture<V> cf;
-        private final long expiredTimeMilli = TimeUnit.MINUTES.toMillis(30);
-
-        public CompletableFutureWithCreateTime(long startTime, 
CompletableFuture<V> cf) {
-            this.startTime = startTime;
-            this.cf = cf;
-        }
-
-        public boolean isExpired() {
-            return System.currentTimeMillis() - startTime > expiredTimeMilli;
-        }
-    }
-
-    private void putIntoIProgressing(K k, CompletableFutureWithCreateTime<V> 
v) {
-        synchronized (inProgressing) {
-            inProgressing.put(k, v);
-        }
-    }
-
-    private void removeFromIProgressing(K k) {
-        synchronized (inProgressing) {
-            inProgressing.remove(k);
-        }
-    }
-
-    public void removeExpiredInProgressing() {
-        // Quite simple logic that would complete very fast.
-        // Lock on object to avoid ConcurrentModificationException.
-        synchronized (inProgressing) {
-            inProgressing.entrySet().removeIf(e -> e.getValue().isExpired());
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java
index e33cff3107a..a281f9b0ece 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java
@@ -18,7 +18,6 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.qe.InternalQueryExecutionException;
 import org.apache.doris.statistics.util.StatisticsUtil;
 
@@ -27,19 +26,11 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;
 
 public class ColumnStatisticsCacheLoader extends 
BasicAsyncCacheLoader<StatisticsCacheKey, Optional<ColumnStatistic>> {
 
     private static final Logger LOG = 
LogManager.getLogger(ColumnStatisticsCacheLoader.class);
 
-    private static final ThreadPoolExecutor singleThreadPool = 
ThreadPoolManager.newDaemonFixedThreadPool(
-            StatisticConstants.RETRY_LOAD_THREAD_POOL_SIZE,
-            StatisticConstants.RETRY_LOAD_QUEUE_SIZE, "STATS_RELOAD",
-            true,
-            new DiscardOldestPolicy());
-
     @Override
     protected Optional<ColumnStatistic> doLoad(StatisticsCacheKey key) {
         Optional<ColumnStatistic> columnStatistic = Optional.empty();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
index 20f25eb3e96..62e11f5c9d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
@@ -44,7 +44,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 public class StatisticsCache {
 
@@ -75,21 +74,6 @@ public class StatisticsCache {
                     .executor(threadPool)
                     .buildAsync(histogramCacheLoader);
 
-    {
-        threadPool.submit(() -> {
-            while (true) {
-                try {
-                    columnStatisticsCacheLoader.removeExpiredInProgressing();
-                    histogramCacheLoader.removeExpiredInProgressing();
-                } catch (Throwable t) {
-                    // IGNORE
-                }
-                Thread.sleep(TimeUnit.MINUTES.toMillis(15));
-            }
-
-        });
-    }
-
     public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long 
tblId, long idxId, String colName) {
         ConnectContext ctx = ConnectContext.get();
         if (ctx != null && ctx.getSessionVariable().internalSession) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
index 00d233ff54b..f23b93624b4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
@@ -266,7 +266,7 @@ public class CacheTest extends TestWithFeService {
                 Assertions.assertEquals(6, columnStatistic.minValue);
                 Assertions.assertEquals(7, columnStatistic.maxValue);
             } else {
-                System.out.println("Cached is not loaded, skip test.");
+                System.out.println("Cache is not loaded, skip test.");
             }
         } catch (Throwable t) {
             t.printStackTrace();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to