This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3040f202a878452dd5f2ea3e8c88954985470927
Author: Xiangyu Wang <[email protected]>
AuthorDate: Thu Jul 6 15:18:30 2023 +0800

    [Enhancement](multi-catalog) Make meta cache batch loading concurrently. 
(#21471)
    
    I will enhance performance about querying meta cache of hms tables by 2 
steps:
    **Step1** : use concurrent batch loading for meta cache
    **Step2** : execute some other tasks concurrently as soon as possible
    
    **This pr mainly for step1 and it mainly do the following things:**
    - Create a `CacheBulkLoader` for batch loading
    - Remove the executor of the previous async cache loader and change the 
loader's type to `CacheBulkLoader` (We do not set any refresh strategies for 
LoadingCache, so the previous executor is not useful)
    - Use a `FixedCacheThreadPool` to replace the `CacheThreadPool` (The 
previous `CacheThreadPool` just log warn infos and will not throw any 
exceptions when the pool is full).
    - Remove parallel streams and use the `CacheBulkLoader` to do batch loadings
    - Change the value of `max_external_cache_loader_thread_pool_size` to 64, 
and set the pool size of hms client pool to 
`max_external_cache_loader_thread_pool_size`
    - Fix the spelling mistake for `max_hive_table_catch_num`
---
 .../main/java/org/apache/doris/common/Config.java  |   4 +-
 .../org/apache/doris/common/ThreadPoolManager.java |   8 ++
 .../apache/doris/common/util/CacheBulkLoader.java  |  51 +++++++
 .../doris/datasource/ExternalMetaCacheMgr.java     |  12 +-
 .../doris/datasource/ExternalSchemaCache.java      |  15 +-
 .../doris/datasource/HMSExternalCatalog.java       |   5 +-
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 153 ++++++++++-----------
 .../apache/doris/common/CacheBulkLoaderTest.java   |  84 +++++++++++
 8 files changed, 232 insertions(+), 100 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index ab49c8e2ac..eb2acb6022 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1694,7 +1694,7 @@ public class Config extends ConfigBase {
 
     @ConfField(mutable = false, masterOnly = false, description = 
{"Hive表到分区名列表缓存的最大数量。",
         "Max cache number of hive table to partition names list."})
-    public static long max_hive_table_catch_num = 1000;
+    public static long max_hive_table_cache_num = 1000;
 
     @ConfField(mutable = false, masterOnly = false, description = 
{"获取Hive分区值时候的最大返回数量,-1代表没有限制。",
         "Max number of hive partition values to return while list partitions, 
-1 means no limitation."})
@@ -1705,7 +1705,7 @@ public class Config extends ConfigBase {
      * Max thread pool size for loading external meta cache
      */
     @ConfField(mutable = false, masterOnly = false)
-    public static int max_external_cache_loader_thread_pool_size = 10;
+    public static int max_external_cache_loader_thread_pool_size = 64;
 
     /**
      * Max cache num of external catalog's file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 28c18618d4..be8731b6b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -126,6 +126,14 @@ public class ThreadPoolManager {
                 poolName, needRegisterMetric);
     }
 
+    public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, 
int queueSize,
+                                                              String poolName, 
int timeoutSeconds,
+                                                              boolean 
needRegisterMetric) {
+        return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, 
TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(queueSize), new 
BlockedPolicy(poolName, timeoutSeconds),
+                poolName, needRegisterMetric);
+    }
+
     public static <T> ThreadPoolExecutor newDaemonFixedPriorityThreadPool(int 
numThread, int initQueueSize,
                                                                           
Comparator<T> comparator, Class<T> tClass,
                                                                           
String poolName, boolean needRegisterMetric) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java
new file mode 100644
index 0000000000..441f22d8b6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.common.Pair;
+
+import com.google.common.cache.CacheLoader;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Streams;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+public abstract class CacheBulkLoader<K, V> extends CacheLoader<K, V> {
+
+    protected abstract ExecutorService getExecutor();
+
+    @Override
+    public Map<K, V> loadAll(Iterable<? extends K> keys)
+                throws ExecutionException, InterruptedException {
+        List<Pair<? extends K, Future<V>>> pList = Streams.stream(keys)
+                .map(key -> Pair.of(key, getExecutor().submit(() -> 
load(key))))
+                .collect(Collectors.toList());
+
+        Map<K, V> vMap = Maps.newLinkedHashMap();
+        for (Pair<? extends K, Future<V>> p : pList) {
+            vMap.put(p.first, p.second.get());
+        }
+        return ImmutableMap.copyOf(vMap);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 6d05eb2648..f5ca819c3b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -30,7 +30,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Cache meta of external catalog
@@ -44,11 +44,13 @@ public class ExternalMetaCacheMgr {
     private Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap();
     // catalog id -> table schema cache
     private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap();
-    private Executor executor;
+    private ExecutorService executor;
 
     public ExternalMetaCacheMgr() {
-        executor = 
ThreadPoolManager.newDaemonCacheThreadPool(Config.max_external_cache_loader_thread_pool_size,
-                "ExternalMetaCacheMgr", true);
+        executor = ThreadPoolManager.newDaemonFixedThreadPool(
+                Config.max_external_cache_loader_thread_pool_size,
+                Config.max_external_cache_loader_thread_pool_size * 1000,
+                "ExternalMetaCacheMgr", 120, true);
     }
 
     public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
@@ -69,7 +71,7 @@ public class ExternalMetaCacheMgr {
         if (cache == null) {
             synchronized (schemaCacheMap) {
                 if (!schemaCacheMap.containsKey(catalog.getId())) {
-                    schemaCacheMap.put(catalog.getId(), new 
ExternalSchemaCache(catalog, executor));
+                    schemaCacheMap.put(catalog.getId(), new 
ExternalSchemaCache(catalog));
                 }
                 cache = schemaCacheMap.get(catalog.getId());
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index a2b734e144..3eab6028d4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -37,31 +37,30 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 // The schema cache for external table
 public class ExternalSchemaCache {
     private static final Logger LOG = 
LogManager.getLogger(ExternalSchemaCache.class);
-    private ExternalCatalog catalog;
+    private final ExternalCatalog catalog;
 
     private LoadingCache<SchemaCacheKey, ImmutableList<Column>> schemaCache;
 
-    public ExternalSchemaCache(ExternalCatalog catalog, Executor executor) {
+    public ExternalSchemaCache(ExternalCatalog catalog) {
         this.catalog = catalog;
-        init(executor);
+        init();
         initMetrics();
     }
 
-    private void init(Executor executor) {
+    private void init() {
         schemaCache = 
CacheBuilder.newBuilder().maximumSize(Config.max_external_schema_cache_num)
                 
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, 
TimeUnit.MINUTES)
-                .build(CacheLoader.asyncReloading(new 
CacheLoader<SchemaCacheKey, ImmutableList<Column>>() {
+                .build(new CacheLoader<SchemaCacheKey, 
ImmutableList<Column>>() {
                     @Override
-                    public ImmutableList<Column> load(SchemaCacheKey key) 
throws Exception {
+                    public ImmutableList<Column> load(SchemaCacheKey key) {
                         return loadSchema(key);
                     }
-                }, executor));
+                });
     }
 
     private void initMetrics() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 1f9f7c59c0..c3e71db222 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -54,7 +54,7 @@ import java.util.Objects;
 public class HMSExternalCatalog extends ExternalCatalog {
     private static final Logger LOG = 
LogManager.getLogger(HMSExternalCatalog.class);
 
-    private static final int MAX_CLIENT_POOL_SIZE = 8;
+    private static final int MIN_CLIENT_POOL_SIZE = 8;
     protected PooledHiveMetaStoreClient client;
     // Record the latest synced event id when processing hive events
     // Must set to -1 otherwise client.getNextNotification will throw exception
@@ -161,7 +161,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
             }
         }
 
-        client = new PooledHiveMetaStoreClient(hiveConf, MAX_CLIENT_POOL_SIZE);
+        client = new PooledHiveMetaStoreClient(hiveConf,
+                    Math.max(MIN_CLIENT_POOL_SIZE, 
Config.max_external_cache_loader_thread_pool_size));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 2859dd8549..3de4a5d1d2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.CacheBulkLoader;
 import org.apache.doris.common.util.S3Util;
 import org.apache.doris.datasource.CacheException;
 import org.apache.doris.datasource.HMSExternalCatalog;
@@ -89,11 +90,10 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 // The cache of a hms catalog. 3 kind of caches:
 // 1. partitionValuesCache: cache the partition values of a table, for 
partition prune.
@@ -101,17 +101,15 @@ import java.util.stream.Stream;
 // 3. fileCache: cache the files of a location.
 public class HiveMetaStoreCache {
     private static final Logger LOG = 
LogManager.getLogger(HiveMetaStoreCache.class);
-    private static final int MIN_BATCH_FETCH_PARTITION_NUM = 50;
     public static final String HIVE_DEFAULT_PARTITION = 
"__HIVE_DEFAULT_PARTITION__";
     // After hive 3, transactional table's will have file '_orc_acid_version' 
with value >= '2'.
     public static final String HIVE_ORC_ACID_VERSION_FILE = 
"_orc_acid_version";
 
     private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = 
"bucket_";
 
-    private HMSExternalCatalog catalog;
+    private final HMSExternalCatalog catalog;
     private JobConf jobConf;
-
-    private Executor executor;
+    private final ExecutorService executor;
 
     // cache from <dbname-tblname> -> <values of partitions>
     private LoadingCache<PartitionValueCacheKey, HivePartitionValues> 
partitionValuesCache;
@@ -121,7 +119,7 @@ public class HiveMetaStoreCache {
     private volatile AtomicReference<LoadingCache<FileCacheKey, 
FileCacheValue>> fileCacheRef
             = new AtomicReference<>();
 
-    public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) {
+    public HiveMetaStoreCache(HMSExternalCatalog catalog, ExecutorService 
executor) {
         this.catalog = catalog;
         this.executor = executor;
         init();
@@ -129,24 +127,40 @@ public class HiveMetaStoreCache {
     }
 
     private void init() {
-        partitionValuesCache = 
CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_catch_num)
+        /**
+         * Because the partitionValuesCache|partitionCache|fileCache use the 
same executor for batch loading,
+         * we need to be very careful and try to avoid the circular dependency 
of there tasks
+         * which will bring out thread deak-locks.
+         * */
+        partitionValuesCache = 
CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num)
                 
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, 
TimeUnit.MINUTES)
-                .build(CacheLoader.asyncReloading(
-                        new CacheLoader<PartitionValueCacheKey, 
HivePartitionValues>() {
-                            @Override
-                            public HivePartitionValues 
load(PartitionValueCacheKey key) throws Exception {
-                                return loadPartitionValues(key);
-                            }
-                        }, executor));
+                .build(new CacheBulkLoader<PartitionValueCacheKey, 
HivePartitionValues>() {
+                    @Override
+                    protected ExecutorService getExecutor() {
+                        return HiveMetaStoreCache.this.executor;
+                    }
+
+                    @Override
+                    public HivePartitionValues load(PartitionValueCacheKey 
key) {
+                        return loadPartitionValues(key);
+                    }
+
+                });
 
         partitionCache = 
CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num)
                 
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, 
TimeUnit.MINUTES)
-                .build(CacheLoader.asyncReloading(new 
CacheLoader<PartitionCacheKey, HivePartition>() {
+                .build(new CacheBulkLoader<PartitionCacheKey, HivePartition>() 
{
                     @Override
-                    public HivePartition load(PartitionCacheKey key) throws 
Exception {
+                    protected ExecutorService getExecutor() {
+                        return HiveMetaStoreCache.this.executor;
+                    }
+
+                    @Override
+                    public HivePartition load(PartitionCacheKey key) {
                         return loadPartitions(key);
                     }
-                }, executor));
+
+                });
 
         setNewFileCache();
     }
@@ -169,10 +183,18 @@ public class HiveMetaStoreCache {
         if (fileMetaCacheTtlSecond >= 
HMSExternalCatalog.FILE_META_CACHE_TTL_DISABLE_CACHE) {
             fileCacheBuilder.expireAfterWrite(fileMetaCacheTtlSecond, 
TimeUnit.SECONDS);
         }
-        // if the file.meta.cache.ttl-second is equal 0, use the synchronous 
loader
-        // if the file.meta.cache.ttl-second greater than 0, use the 
asynchronous loader
-        CacheLoader<FileCacheKey, FileCacheValue> loader = 
getGuavaCacheLoader(executor,
-                fileMetaCacheTtlSecond);
+
+        CacheLoader<FileCacheKey, FileCacheValue> loader = new 
CacheBulkLoader<FileCacheKey, FileCacheValue>() {
+            @Override
+            protected ExecutorService getExecutor() {
+                return HiveMetaStoreCache.this.executor;
+            }
+
+            @Override
+            public FileCacheValue load(FileCacheKey key) {
+                return loadFiles(key);
+            }
+        };
 
         LoadingCache<FileCacheKey, FileCacheValue> preFileCache = 
fileCacheRef.get();
 
@@ -374,6 +396,13 @@ public class HiveMetaStoreCache {
                     }
                 }
 
+                // Replace default hive partition with a null_string.
+                for (int i = 0; i < result.getValuesSize(); i++) {
+                    if 
(HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) {
+                        result.getPartitionValues().set(i, 
FeConstants.null_string);
+                    }
+                }
+
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("load #{} splits for {} in catalog {}", 
result.getFiles().size(), key, catalog.getName());
                 }
@@ -425,36 +454,23 @@ public class HiveMetaStoreCache {
 
     public List<FileCacheValue> getFilesByPartitions(List<HivePartition> 
partitions, boolean useSelfSplitter) {
         long start = System.currentTimeMillis();
-        List<FileCacheKey> keys = 
Lists.newArrayListWithExpectedSize(partitions.size());
-        partitions.stream().forEach(p -> {
+        List<FileCacheKey> keys = partitions.stream().map(p -> {
             FileCacheKey fileCacheKey = p.isDummyPartition()
                     ? FileCacheKey.createDummyCacheKey(p.getDbName(), 
p.getTblName(), p.getPath(),
                     p.getInputFormat(), useSelfSplitter)
                     : new FileCacheKey(p.getPath(), p.getInputFormat(), 
p.getPartitionValues());
             fileCacheKey.setUseSelfSplitter(useSelfSplitter);
-            keys.add(fileCacheKey);
-        });
+            return fileCacheKey;
+        }).collect(Collectors.toList());
 
-        Stream<FileCacheKey> stream;
-        if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
-            stream = keys.stream();
-        } else {
-            stream = keys.parallelStream();
+        List<FileCacheValue> fileLists;
+        try {
+            fileLists = fileCacheRef.get().getAll(keys).values().asList();
+        } catch (ExecutionException e) {
+            throw new CacheException("failed to get files from partitions in 
catalog %s",
+                        e, catalog.getName());
         }
-        List<FileCacheValue> fileLists = stream.map(k -> {
-            try {
-                FileCacheValue fileCacheValue = fileCacheRef.get().get(k);
-                // Replace default hive partition with a null_string.
-                for (int i = 0; i < fileCacheValue.getValuesSize(); i++) {
-                    if 
(HIVE_DEFAULT_PARTITION.equals(fileCacheValue.getPartitionValues().get(i))) {
-                        fileCacheValue.getPartitionValues().set(i, 
FeConstants.null_string);
-                    }
-                }
-                return fileCacheValue;
-            } catch (ExecutionException e) {
-                throw new RuntimeException(e);
-            }
-        }).collect(Collectors.toList());
+
         LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} 
ms",
                 fileLists.stream().mapToInt(l -> l.getFiles() == null
                     ? (l.getSplits() == null ? 0 : l.getSplits().size()) : 
l.getFiles().size()).sum(),
@@ -464,22 +480,17 @@ public class HiveMetaStoreCache {
 
     public List<HivePartition> getAllPartitions(String dbName, String name, 
List<List<String>> partitionValuesList) {
         long start = System.currentTimeMillis();
-        List<PartitionCacheKey> keys = 
Lists.newArrayListWithExpectedSize(partitionValuesList.size());
-        partitionValuesList.stream().forEach(p -> keys.add(new 
PartitionCacheKey(dbName, name, p)));
+        List<PartitionCacheKey> keys = partitionValuesList.stream()
+                    .map(p -> new PartitionCacheKey(dbName, name, p))
+                    .collect(Collectors.toList());
 
-        Stream<PartitionCacheKey> stream;
-        if (partitionValuesList.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
-            stream = keys.stream();
-        } else {
-            stream = keys.parallelStream();
+        List<HivePartition> partitions;
+        try {
+            partitions = partitionCache.getAll(keys).values().asList();
+        } catch (ExecutionException e) {
+            throw new CacheException("failed to get partition in catalog %s", 
e, catalog.getName());
         }
-        List<HivePartition> partitions = stream.map(k -> {
-            try {
-                return partitionCache.get(k);
-            } catch (ExecutionException e) {
-                throw new CacheException("failed to get partition for %s in 
catalog %s", e, k, catalog.getName());
-            }
-        }).collect(Collectors.toList());
+
         LOG.debug("get #{} partitions in catalog {} cost: {} ms", 
partitions.size(), catalog.getName(),
                 (System.currentTimeMillis() - start));
         return partitions;
@@ -665,30 +676,6 @@ public class HiveMetaStoreCache {
         partitionValuesCache.put(key, values);
     }
 
-    /***
-     * get the guava CacheLoader
-     * if the fileMetaCacheTtlSecond equal 0 , the synchronous loader is used
-     * if the fileMetaCacheTtlSecond greater than 0 , the asynchronous loader 
is used
-     * @param executor
-     * @param fileMetaCacheTtlSecond
-     * @return
-     */
-    private CacheLoader<FileCacheKey, FileCacheValue> 
getGuavaCacheLoader(Executor executor,
-            int fileMetaCacheTtlSecond) {
-        CacheLoader<FileCacheKey, FileCacheValue> loader =
-                new CacheLoader<FileCacheKey, FileCacheValue>() {
-                    @Override
-                    public FileCacheValue load(FileCacheKey key) throws 
Exception {
-                        return loadFiles(key);
-                    }
-                };
-        if (fileMetaCacheTtlSecond == 
HMSExternalCatalog.FILE_META_CACHE_TTL_DISABLE_CACHE) {
-            return loader;
-        } else {
-            return CacheLoader.asyncReloading(loader, executor);
-        }
-    }
-
     /***
      * get fileCache ref
      * @return
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java
new file mode 100644
index 0000000000..0cc54817b3
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import org.apache.doris.common.util.CacheBulkLoader;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.LoadingCache;
+import org.apache.commons.collections.MapUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class CacheBulkLoaderTest {
+
+    @Test
+    public void test() {
+        ThreadPoolExecutor executor = 
ThreadPoolManager.newDaemonFixedThreadPool(
+                10, 10, "TestThreadPool", 120, true);
+
+        LoadingCache<String, String> testCache = 
CacheBuilder.newBuilder().maximumSize(100)
+                .expireAfterAccess(1, TimeUnit.MINUTES)
+                .build(new CacheBulkLoader<String, String>() {
+                    @Override
+                    protected ExecutorService getExecutor() {
+                        return executor;
+                    }
+
+                    @Override
+                    public String load(String key) {
+                        
Assertions.assertTrue(Thread.currentThread().getName().startsWith("TestThreadPool"));
+                        try {
+                            Thread.sleep(100);
+                        } catch (InterruptedException interruptedException) {
+                            interruptedException.printStackTrace();
+                        }
+                        return key.replace("k", "v");
+                    }
+                });
+
+        List<String> testKeys = IntStream.range(1, 101).boxed()
+                    .map(i -> String.format("k%d", 
i)).collect(Collectors.toList());
+        try {
+            Map<String, String> vMap = testCache.getAll(testKeys);
+            Assertions.assertTrue(MapUtils.isNotEmpty(vMap) && vMap.size() == 
testKeys.size());
+            for (String key : vMap.keySet()) {
+                Assertions.assertTrue(key.replace("k", 
"v").equals(vMap.get(key)));
+            }
+        } catch (ExecutionException e) {
+            e.printStackTrace();
+            Assertions.fail();
+        }
+
+        try {
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to