This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3040f202a878452dd5f2ea3e8c88954985470927 Author: Xiangyu Wang <[email protected]> AuthorDate: Thu Jul 6 15:18:30 2023 +0800 [Enhancement](multi-catalog) Make meta cache batch loading concurrently. (#21471) I will enhance performance about querying meta cache of hms tables by 2 steps: **Step1** : use concurrent batch loading for meta cache **Step2** : execute some other tasks concurrently as soon as possible **This pr mainly for step1 and it mainly do the following things:** - Create a `CacheBulkLoader` for batch loading - Remove the executor of the previous async cache loader and change the loader's type to `CacheBulkLoader` (We do not set any refresh strategies for LoadingCache, so the previous executor is not useful) - Use a `FixedCacheThreadPool` to replace the `CacheThreadPool` (The previous `CacheThreadPool` just log warn infos and will not throw any exceptions when the pool is full). - Remove parallel streams and use the `CacheBulkLoader` to do batch loadings - Change the value of `max_external_cache_loader_thread_pool_size` to 64, and set the pool size of hms client pool to `max_external_cache_loader_thread_pool_size` - Fix the spelling mistake for `max_hive_table_catch_num` --- .../main/java/org/apache/doris/common/Config.java | 4 +- .../org/apache/doris/common/ThreadPoolManager.java | 8 ++ .../apache/doris/common/util/CacheBulkLoader.java | 51 +++++++ .../doris/datasource/ExternalMetaCacheMgr.java | 12 +- .../doris/datasource/ExternalSchemaCache.java | 15 +- .../doris/datasource/HMSExternalCatalog.java | 5 +- .../doris/datasource/hive/HiveMetaStoreCache.java | 153 ++++++++++----------- .../apache/doris/common/CacheBulkLoaderTest.java | 84 +++++++++++ 8 files changed, 232 insertions(+), 100 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index ab49c8e2ac..eb2acb6022 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1694,7 +1694,7 @@ public class Config extends ConfigBase { @ConfField(mutable = false, masterOnly = false, description = {"Hive表到分区名列表缓存的最大数量。", "Max cache number of hive table to partition names list."}) - public static long max_hive_table_catch_num = 1000; + public static long max_hive_table_cache_num = 1000; @ConfField(mutable = false, masterOnly = false, description = {"获取Hive分区值时候的最大返回数量,-1代表没有限制。", "Max number of hive partition values to return while list partitions, -1 means no limitation."}) @@ -1705,7 +1705,7 @@ public class Config extends ConfigBase { * Max thread pool size for loading external meta cache */ @ConfField(mutable = false, masterOnly = false) - public static int max_external_cache_loader_thread_pool_size = 10; + public static int max_external_cache_loader_thread_pool_size = 64; /** * Max cache num of external catalog's file diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index 28c18618d4..be8731b6b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -126,6 +126,14 @@ public class ThreadPoolManager { poolName, needRegisterMetric); } + public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, + String poolName, int timeoutSeconds, + boolean needRegisterMetric) { + return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueSize), new BlockedPolicy(poolName, timeoutSeconds), + poolName, needRegisterMetric); + } + public static <T> ThreadPoolExecutor newDaemonFixedPriorityThreadPool(int numThread, int initQueueSize, Comparator<T> comparator, Class<T> tClass, String poolName, boolean needRegisterMetric) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java new file mode 100644 index 0000000000..441f22d8b6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.common.Pair; + +import com.google.common.cache.CacheLoader; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Streams; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +public abstract class CacheBulkLoader<K, V> extends CacheLoader<K, V> { + + protected abstract ExecutorService getExecutor(); + + @Override + public Map<K, V> loadAll(Iterable<? extends K> keys) + throws ExecutionException, InterruptedException { + List<Pair<? extends K, Future<V>>> pList = Streams.stream(keys) + .map(key -> Pair.of(key, getExecutor().submit(() -> load(key)))) + .collect(Collectors.toList()); + + Map<K, V> vMap = Maps.newLinkedHashMap(); + for (Pair<? extends K, Future<V>> p : pList) { + vMap.put(p.first, p.second.get()); + } + return ImmutableMap.copyOf(vMap); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 6d05eb2648..f5ca819c3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -30,7 +30,7 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; /** * Cache meta of external catalog @@ -44,11 +44,13 @@ public class ExternalMetaCacheMgr { private Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap(); // catalog id -> table schema cache private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap(); - private Executor executor; + private ExecutorService executor; public ExternalMetaCacheMgr() { - executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_external_cache_loader_thread_pool_size, - "ExternalMetaCacheMgr", true); + executor = ThreadPoolManager.newDaemonFixedThreadPool( + Config.max_external_cache_loader_thread_pool_size, + Config.max_external_cache_loader_thread_pool_size * 1000, + "ExternalMetaCacheMgr", 120, true); } public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) { @@ -69,7 +71,7 @@ public class ExternalMetaCacheMgr { if (cache == null) { synchronized (schemaCacheMap) { if (!schemaCacheMap.containsKey(catalog.getId())) { - schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog, executor)); + schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog)); } cache = schemaCacheMap.get(catalog.getId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index a2b734e144..3eab6028d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -37,31 +37,30 @@ import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; // The schema cache for external table public class ExternalSchemaCache { private static final Logger LOG = LogManager.getLogger(ExternalSchemaCache.class); - private ExternalCatalog catalog; + private final ExternalCatalog catalog; private LoadingCache<SchemaCacheKey, ImmutableList<Column>> schemaCache; - public ExternalSchemaCache(ExternalCatalog catalog, Executor executor) { + public ExternalSchemaCache(ExternalCatalog catalog) { this.catalog = catalog; - init(executor); + init(); initMetrics(); } - private void init(Executor executor) { + private void init() { schemaCache = CacheBuilder.newBuilder().maximumSize(Config.max_external_schema_cache_num) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(CacheLoader.asyncReloading(new CacheLoader<SchemaCacheKey, ImmutableList<Column>>() { + .build(new CacheLoader<SchemaCacheKey, ImmutableList<Column>>() { @Override - public ImmutableList<Column> load(SchemaCacheKey key) throws Exception { + public ImmutableList<Column> load(SchemaCacheKey key) { return loadSchema(key); } - }, executor)); + }); } private void initMetrics() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index 1f9f7c59c0..c3e71db222 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -54,7 +54,7 @@ import java.util.Objects; public class HMSExternalCatalog extends ExternalCatalog { private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class); - private static final int MAX_CLIENT_POOL_SIZE = 8; + private static final int MIN_CLIENT_POOL_SIZE = 8; protected PooledHiveMetaStoreClient client; // Record the latest synced event id when processing hive events // Must set to -1 otherwise client.getNextNotification will throw exception @@ -161,7 +161,8 @@ public class HMSExternalCatalog extends ExternalCatalog { } } - client = new PooledHiveMetaStoreClient(hiveConf, MAX_CLIENT_POOL_SIZE); + client = new PooledHiveMetaStoreClient(hiveConf, + Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 2859dd8549..3de4a5d1d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -29,6 +29,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.S3Util; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.HMSExternalCatalog; @@ -89,11 +90,10 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import java.util.stream.Stream; // The cache of a hms catalog. 3 kind of caches: // 1. partitionValuesCache: cache the partition values of a table, for partition prune. @@ -101,17 +101,15 @@ import java.util.stream.Stream; // 3. fileCache: cache the files of a location. public class HiveMetaStoreCache { private static final Logger LOG = LogManager.getLogger(HiveMetaStoreCache.class); - private static final int MIN_BATCH_FETCH_PARTITION_NUM = 50; public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__"; // After hive 3, transactional table's will have file '_orc_acid_version' with value >= '2'. public static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version"; private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_"; - private HMSExternalCatalog catalog; + private final HMSExternalCatalog catalog; private JobConf jobConf; - - private Executor executor; + private final ExecutorService executor; // cache from <dbname-tblname> -> <values of partitions> private LoadingCache<PartitionValueCacheKey, HivePartitionValues> partitionValuesCache; @@ -121,7 +119,7 @@ public class HiveMetaStoreCache { private volatile AtomicReference<LoadingCache<FileCacheKey, FileCacheValue>> fileCacheRef = new AtomicReference<>(); - public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) { + public HiveMetaStoreCache(HMSExternalCatalog catalog, ExecutorService executor) { this.catalog = catalog; this.executor = executor; init(); @@ -129,24 +127,40 @@ public class HiveMetaStoreCache { } private void init() { - partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_catch_num) + /** + * Because the partitionValuesCache|partitionCache|fileCache use the same executor for batch loading, + * we need to be very careful and try to avoid the circular dependency of there tasks + * which will bring out thread deak-locks. + * */ + partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(CacheLoader.asyncReloading( - new CacheLoader<PartitionValueCacheKey, HivePartitionValues>() { - @Override - public HivePartitionValues load(PartitionValueCacheKey key) throws Exception { - return loadPartitionValues(key); - } - }, executor)); + .build(new CacheBulkLoader<PartitionValueCacheKey, HivePartitionValues>() { + @Override + protected ExecutorService getExecutor() { + return HiveMetaStoreCache.this.executor; + } + + @Override + public HivePartitionValues load(PartitionValueCacheKey key) { + return loadPartitionValues(key); + } + + }); partitionCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(CacheLoader.asyncReloading(new CacheLoader<PartitionCacheKey, HivePartition>() { + .build(new CacheBulkLoader<PartitionCacheKey, HivePartition>() { @Override - public HivePartition load(PartitionCacheKey key) throws Exception { + protected ExecutorService getExecutor() { + return HiveMetaStoreCache.this.executor; + } + + @Override + public HivePartition load(PartitionCacheKey key) { return loadPartitions(key); } - }, executor)); + + }); setNewFileCache(); } @@ -169,10 +183,18 @@ public class HiveMetaStoreCache { if (fileMetaCacheTtlSecond >= HMSExternalCatalog.FILE_META_CACHE_TTL_DISABLE_CACHE) { fileCacheBuilder.expireAfterWrite(fileMetaCacheTtlSecond, TimeUnit.SECONDS); } - // if the file.meta.cache.ttl-second is equal 0, use the synchronous loader - // if the file.meta.cache.ttl-second greater than 0, use the asynchronous loader - CacheLoader<FileCacheKey, FileCacheValue> loader = getGuavaCacheLoader(executor, - fileMetaCacheTtlSecond); + + CacheLoader<FileCacheKey, FileCacheValue> loader = new CacheBulkLoader<FileCacheKey, FileCacheValue>() { + @Override + protected ExecutorService getExecutor() { + return HiveMetaStoreCache.this.executor; + } + + @Override + public FileCacheValue load(FileCacheKey key) { + return loadFiles(key); + } + }; LoadingCache<FileCacheKey, FileCacheValue> preFileCache = fileCacheRef.get(); @@ -374,6 +396,13 @@ public class HiveMetaStoreCache { } } + // Replace default hive partition with a null_string. + for (int i = 0; i < result.getValuesSize(); i++) { + if (HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) { + result.getPartitionValues().set(i, FeConstants.null_string); + } + } + if (LOG.isDebugEnabled()) { LOG.debug("load #{} splits for {} in catalog {}", result.getFiles().size(), key, catalog.getName()); } @@ -425,36 +454,23 @@ public class HiveMetaStoreCache { public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions, boolean useSelfSplitter) { long start = System.currentTimeMillis(); - List<FileCacheKey> keys = Lists.newArrayListWithExpectedSize(partitions.size()); - partitions.stream().forEach(p -> { + List<FileCacheKey> keys = partitions.stream().map(p -> { FileCacheKey fileCacheKey = p.isDummyPartition() ? FileCacheKey.createDummyCacheKey(p.getDbName(), p.getTblName(), p.getPath(), p.getInputFormat(), useSelfSplitter) : new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues()); fileCacheKey.setUseSelfSplitter(useSelfSplitter); - keys.add(fileCacheKey); - }); + return fileCacheKey; + }).collect(Collectors.toList()); - Stream<FileCacheKey> stream; - if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) { - stream = keys.stream(); - } else { - stream = keys.parallelStream(); + List<FileCacheValue> fileLists; + try { + fileLists = fileCacheRef.get().getAll(keys).values().asList(); + } catch (ExecutionException e) { + throw new CacheException("failed to get files from partitions in catalog %s", + e, catalog.getName()); } - List<FileCacheValue> fileLists = stream.map(k -> { - try { - FileCacheValue fileCacheValue = fileCacheRef.get().get(k); - // Replace default hive partition with a null_string. - for (int i = 0; i < fileCacheValue.getValuesSize(); i++) { - if (HIVE_DEFAULT_PARTITION.equals(fileCacheValue.getPartitionValues().get(i))) { - fileCacheValue.getPartitionValues().set(i, FeConstants.null_string); - } - } - return fileCacheValue; - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); + LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms", fileLists.stream().mapToInt(l -> l.getFiles() == null ? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(), @@ -464,22 +480,17 @@ public class HiveMetaStoreCache { public List<HivePartition> getAllPartitions(String dbName, String name, List<List<String>> partitionValuesList) { long start = System.currentTimeMillis(); - List<PartitionCacheKey> keys = Lists.newArrayListWithExpectedSize(partitionValuesList.size()); - partitionValuesList.stream().forEach(p -> keys.add(new PartitionCacheKey(dbName, name, p))); + List<PartitionCacheKey> keys = partitionValuesList.stream() + .map(p -> new PartitionCacheKey(dbName, name, p)) + .collect(Collectors.toList()); - Stream<PartitionCacheKey> stream; - if (partitionValuesList.size() < MIN_BATCH_FETCH_PARTITION_NUM) { - stream = keys.stream(); - } else { - stream = keys.parallelStream(); + List<HivePartition> partitions; + try { + partitions = partitionCache.getAll(keys).values().asList(); + } catch (ExecutionException e) { + throw new CacheException("failed to get partition in catalog %s", e, catalog.getName()); } - List<HivePartition> partitions = stream.map(k -> { - try { - return partitionCache.get(k); - } catch (ExecutionException e) { - throw new CacheException("failed to get partition for %s in catalog %s", e, k, catalog.getName()); - } - }).collect(Collectors.toList()); + LOG.debug("get #{} partitions in catalog {} cost: {} ms", partitions.size(), catalog.getName(), (System.currentTimeMillis() - start)); return partitions; @@ -665,30 +676,6 @@ public class HiveMetaStoreCache { partitionValuesCache.put(key, values); } - /*** - * get the guava CacheLoader - * if the fileMetaCacheTtlSecond equal 0 , the synchronous loader is used - * if the fileMetaCacheTtlSecond greater than 0 , the asynchronous loader is used - * @param executor - * @param fileMetaCacheTtlSecond - * @return - */ - private CacheLoader<FileCacheKey, FileCacheValue> getGuavaCacheLoader(Executor executor, - int fileMetaCacheTtlSecond) { - CacheLoader<FileCacheKey, FileCacheValue> loader = - new CacheLoader<FileCacheKey, FileCacheValue>() { - @Override - public FileCacheValue load(FileCacheKey key) throws Exception { - return loadFiles(key); - } - }; - if (fileMetaCacheTtlSecond == HMSExternalCatalog.FILE_META_CACHE_TTL_DISABLE_CACHE) { - return loader; - } else { - return CacheLoader.asyncReloading(loader, executor); - } - } - /*** * get fileCache ref * @return diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java new file mode 100644 index 0000000000..0cc54817b3 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.doris.common.util.CacheBulkLoader; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.LoadingCache; +import org.apache.commons.collections.MapUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class CacheBulkLoaderTest { + + @Test + public void test() { + ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool( + 10, 10, "TestThreadPool", 120, true); + + LoadingCache<String, String> testCache = CacheBuilder.newBuilder().maximumSize(100) + .expireAfterAccess(1, TimeUnit.MINUTES) + .build(new CacheBulkLoader<String, String>() { + @Override + protected ExecutorService getExecutor() { + return executor; + } + + @Override + public String load(String key) { + Assertions.assertTrue(Thread.currentThread().getName().startsWith("TestThreadPool")); + try { + Thread.sleep(100); + } catch (InterruptedException interruptedException) { + interruptedException.printStackTrace(); + } + return key.replace("k", "v"); + } + }); + + List<String> testKeys = IntStream.range(1, 101).boxed() + .map(i -> String.format("k%d", i)).collect(Collectors.toList()); + try { + Map<String, String> vMap = testCache.getAll(testKeys); + Assertions.assertTrue(MapUtils.isNotEmpty(vMap) && vMap.size() == testKeys.size()); + for (String key : vMap.keySet()) { + Assertions.assertTrue(key.replace("k", "v").equals(vMap.get(key))); + } + } catch (ExecutionException e) { + e.printStackTrace(); + Assertions.fail(); + } + + try { + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } catch (Exception e) { + e.printStackTrace(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
