This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 893e5a3a2eb [fix](fe) Avoid blocking external meta cache refresh on
slow miss load (#64705)
893e5a3a2eb is described below
commit 893e5a3a2eb47d903cca626fbd8f1c4a51b4ef7c
Author: Wen Zhenghu <[email protected]>
AuthorDate: Wed Jun 24 18:52:10 2026 +0800
[fix](fe) Avoid blocking external meta cache refresh on slow miss load
(#64705)
### What problem does this PR solve?
Issue Number: N/A
Related PR: N/A
Problem Summary:
This PR avoids blocking external meta cache invalidation on slow miss
loads in FE. Previously, `MetaCacheEntry` relied on Caffeine's
synchronous loading path for cache misses. When an external metadata
loader became slow, operations that invalidate the same cache, such as
`REFRESH CATALOG` and the corresponding replay path, could wait on the
slow load and block the replay-related invalidation flow.
Implementation summary:
- Keep the existing `LoadingCache` to preserve current hit-path behavior
and `refreshAfterWrite` support.
- Add a manual miss-load path behind a new FE config switch, using
`getIfPresent()` instead of synchronous `LoadingCache.get()` for misses.
- Deduplicate concurrent miss loads with striped locks inside
`MetaCacheEntry`.
- Add an entry-level `invalidateGeneration` counter. Each invalidate
increments the generation before clearing cache state.
- Record the generation before a manual miss load, check it once before
`put()`, and check it again after `put()`. If invalidation happens
during the race window, the just-loaded value is removed so stale data
is not kept in cache.
- Keep null miss-load results uncached so the manual path does not
attempt to put null into Caffeine.
Configuration:
- Add FE config `enable_external_meta_cache_manual_miss_load`, default
`false`.
- When it is `false`, `MetaCacheEntry` keeps the original synchronous
Caffeine miss-load behavior.
- When it is `true`, `MetaCacheEntry` uses the manual miss-load path
plus `invalidateGeneration` protection.
Scope and limitations:
- This change applies to `MetaCacheEntry` used by external metadata
cache paths in FE. It does not cover the legacy `MetaCache`.
- `LegacyMetaCacheFactory` is intentionally not refactored in this PR. A
follow-up PR will rework that path with `MetaCache`, and the legacy
factory changes are left to that dedicated refactor.
- The protection is designed for manual miss loads. It does not make
Caffeine's asynchronous `refreshAfterWrite` reload generation-aware.
- As a result, `refreshAfterWrite` is still preserved, but an async
refresh result may still write back after an invalidate. That is an
intentional trade-off in this version.
- The new regression case is valuable as a reference and for suitable
environments, but it may be skipped in standard CI because it depends on
JDBC regression setup, FE debug points, and an external MySQL/JDBC
environment.
### Release note
None
### Check List (For Author)
- Test
- [ ] Regression test
- [x] Unit Test
- [x] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason
Manual test:
1. Reproduced the blocking path with `REFRESH CATALOG` against a JDBC
external catalog and a debug point that sleeps in
`PluginDrivenExternalTable.initSchema`.
2. Repeated the baseline scenario 5 times with
`enable_external_meta_cache_manual_miss_load=false` and observed
`REFRESH CATALOG` blocked for about 14s while `DESC` stayed slow.
3. Repeated the optimized scenario 5 times with
`enable_external_meta_cache_manual_miss_load=true` and observed `REFRESH
CATALOG` return within about 1s while `DESC` remained slow.
4. Added a regression case as a manual-test reference because its
execution depends on JDBC regression environment and FE debug-point
availability.
Unit test:
- `FE_UT_PARALLEL=1 ./run-fe-ut.sh --run
org.apache.doris.datasource.metacache.MetaCacheEntryTest`
- Behavior changed:
- [x] Yes.
Behavior change:
- `REFRESH CATALOG` and the corresponding FE invalidation path are no
longer blocked by slow external metadata miss loads in this
`MetaCacheEntry` implementation.
- Does this need documentation?
- [x] No.
- [ ] Yes.
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label
---
.../main/java/org/apache/doris/common/Config.java | 5 +
.../doris/datasource/metacache/MetaCacheEntry.java | 129 ++++++++++-
.../datasource/metacache/MetaCacheEntryTest.java | 197 ++++++++++++++++-
...st_jdbc_refresh_catalog_manual_miss_load.groovy | 243 +++++++++++++++++++++
4 files changed, 563 insertions(+), 11 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index ff1b7502f23..3fab9c36023 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2193,6 +2193,11 @@ public class Config extends ConfigBase {
@ConfField(description = {"The auto-refresh interval of the external meta
cache."})
public static long external_cache_refresh_time_minutes = 10; // 10 mins
+ // Enable manual miss load for external meta cache to avoid blocking
replayer on slow loaders.
+ @ConfField(mutable = true, masterOnly = false,
+ description = {"Whether external meta cache uses manual miss load
instead of Caffeine sync load."})
+ public static boolean enable_external_meta_cache_manual_miss_load = true;
+
/**
* Github workflow test type, for setting some session variables
* only for certain test type. E.g. only settting batch_size to small
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCacheEntry.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCacheEntry.java
index 8913cd8f4ca..30668163539 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCacheEntry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCacheEntry.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.metacache;
import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
+import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
@@ -39,14 +40,28 @@ import javax.annotation.Nullable;
* key/predicate/full invalidation, and lightweight runtime stats.
*/
public class MetaCacheEntry<K, V> {
+ // Use striped locks to deduplicate slow external loads without managing
per-key lock lifecycle.
+ private static final int LOAD_LOCK_STRIPES = 128;
+
private final String name;
@Nullable
private final Function<K, V> loader;
private final CacheSpec cacheSpec;
private final boolean effectiveEnabled;
private final boolean autoRefresh;
- private final LoadingCache<K, V> data;
+ // Keep the loading cache for refreshAfterWrite and the legacy sync-load
path when the feature is disabled.
+ private final LoadingCache<K, V> loadingData;
+ // Use the plain cache view for manual miss load so slow I/O does not
happen in Caffeine's sync load path.
+ private final Cache<K, V> data;
+ // Protect one key stripe at a time to deduplicate concurrent miss loads
with bounded lock count.
+ private final Object[] loadLocks = new Object[LOAD_LOCK_STRIPES];
private final AtomicLong invalidateCount = new AtomicLong(0);
+ // Bump generation before invalidation so in-flight manual loads do not
repopulate stale values.
+ private final AtomicLong invalidateGeneration = new AtomicLong(0);
+ // Track load statistics outside Caffeine because manual miss loads bypass
the built-in load counters.
+ private final AtomicLong loadSuccessCount = new AtomicLong(0);
+ private final AtomicLong loadFailureCount = new AtomicLong(0);
+ private final AtomicLong totalLoadTimeNanos = new AtomicLong(0);
private final AtomicLong lastLoadSuccessTimeMs = new AtomicLong(-1L);
private final AtomicLong lastLoadFailureTimeMs = new AtomicLong(-1L);
private final AtomicReference<String> lastError = new
AtomicReference<>("");
@@ -92,7 +107,12 @@ public class MetaCacheEntry<K, V> {
maxSize,
true,
null);
- this.data = cacheFactory.buildCache(this::loadFromDefaultLoader,
refreshExecutor);
+ this.loadingData =
cacheFactory.buildCache(this::loadFromDefaultLoader, refreshExecutor);
+ this.data = loadingData;
+ // Initialize striped locks eagerly to keep the hot path
allocation-free.
+ for (int i = 0; i < loadLocks.length; i++) {
+ loadLocks[i] = new Object();
+ }
}
public String name() {
@@ -100,29 +120,43 @@ public class MetaCacheEntry<K, V> {
}
public V get(K key) {
- return data.get(key);
+ if (!isManualMissLoadEnabled()) {
+ return loadingData.get(key);
+ }
+ return getWithManualLoad(key, this::applyDefaultLoader);
}
public V get(K key, Function<K, V> missLoader) {
Function<K, V> loadFunction = Objects.requireNonNull(missLoader,
"missLoader can not be null");
- return data.get(key, typedKey -> loadAndTrack(typedKey, loadFunction));
+ if (!isManualMissLoadEnabled()) {
+ return loadingData.get(key, typedKey -> loadAndTrack(typedKey,
loadFunction));
+ }
+ return getWithManualLoad(key, loadFunction);
}
public V getIfPresent(K key) {
+ if (!effectiveEnabled) {
+ return null;
+ }
return data.getIfPresent(key);
}
public void put(K key, V value) {
+ if (!effectiveEnabled) {
+ return;
+ }
data.put(key, value);
}
public void invalidateKey(K key) {
+ invalidateGeneration.incrementAndGet();
if (data.asMap().remove(key) != null) {
invalidateCount.incrementAndGet();
}
}
public void invalidateIf(Predicate<K> predicate) {
+ invalidateGeneration.incrementAndGet();
data.asMap().keySet().removeIf(key -> {
if (predicate.test(key)) {
invalidateCount.incrementAndGet();
@@ -133,6 +167,7 @@ public class MetaCacheEntry<K, V> {
}
public void invalidateAll() {
+ invalidateGeneration.incrementAndGet();
long size = data.estimatedSize();
data.invalidateAll();
invalidateCount.addAndGet(size);
@@ -143,7 +178,11 @@ public class MetaCacheEntry<K, V> {
}
public MetaCacheEntryStats stats() {
- CacheStats cacheStats = data.stats();
+ CacheStats cacheStats = loadingData.stats();
+ long successCount = loadSuccessCount.get();
+ long failureCount = loadFailureCount.get();
+ long totalLoadTime = totalLoadTimeNanos.get();
+ long totalLoadCount = successCount + failureCount;
return new MetaCacheEntryStats(
cacheSpec.isEnable(),
effectiveEnabled,
@@ -155,10 +194,10 @@ public class MetaCacheEntry<K, V> {
cacheStats.hitCount(),
cacheStats.missCount(),
cacheStats.hitRate(),
- cacheStats.loadSuccessCount(),
- cacheStats.loadFailureCount(),
- cacheStats.totalLoadTime(),
- cacheStats.averageLoadPenalty(),
+ successCount,
+ failureCount,
+ totalLoadTime,
+ totalLoadCount == 0 ? 0D : (double) totalLoadTime /
totalLoadCount,
cacheStats.evictionCount(),
invalidateCount.get(),
lastLoadSuccessTimeMs.get(),
@@ -166,20 +205,90 @@ public class MetaCacheEntry<K, V> {
lastError.get());
}
+ // Read the config dynamically so existing cache entries follow runtime
config updates.
+ private boolean isManualMissLoadEnabled() {
+ return Config.enable_external_meta_cache_manual_miss_load;
+ }
+
+ // Execute slow miss loads outside Caffeine's sync load path and suppress
stale write-back after invalidation.
+ private V getWithManualLoad(K key, Function<K, V> loadFunction) {
+ if (!effectiveEnabled) {
+ // Bypass cache entirely when the entry is disabled so manual miss
load does not relax disable semantics.
+ return loadAndTrack(key, loadFunction);
+ }
+
+ V value = data.getIfPresent(key);
+ if (value != null) {
+ return value;
+ }
+
+ synchronized (loadLock(key)) {
+ value = data.asMap().get(key);
+ if (value != null) {
+ return value;
+ }
+
+ long generation = invalidateGeneration.get();
+ V loaded = loadAndTrack(key, loadFunction);
+ if (generation != invalidateGeneration.get()) {
+ return loaded;
+ }
+
+ // Keep null results uncached so manual miss load matches
LoadingCache null-return behavior.
+ if (loaded == null) {
+ return null;
+ }
+
+ // Leave a narrow hook for tests to pause exactly before the cache
put race window.
+ beforeManualCachePutForTest(key, loaded);
+ data.put(key, loaded);
+ if (generation != invalidateGeneration.get()) {
+ removeLoadedValue(key, loaded);
+ }
+ return loaded;
+ }
+ }
+
+ // Remove only the value loaded by the current request and keep newer
replacements intact.
+ private void removeLoadedValue(K key, V loaded) {
+ data.asMap().computeIfPresent(key, (ignored, currentValue) ->
currentValue == loaded ? null : currentValue);
+ }
+
+ // Map keys to a fixed lock stripe set to bound memory usage while keeping
same-key deduplication.
+ private Object loadLock(K key) {
+ int hash = key == null ? 0 : key.hashCode();
+ return loadLocks[(hash & Integer.MAX_VALUE) % loadLocks.length];
+ }
+
+ // Let tests pause between the first generation check and data.put without
affecting production behavior.
+ void beforeManualCachePutForTest(K key, V loaded) {
+ }
+
private V loadFromDefaultLoader(K key) {
+ return loadAndTrack(key, this::applyDefaultLoader);
+ }
+
+ // Resolve the default loader separately so the manual path can share
tracking without double counting.
+ private V applyDefaultLoader(K key) {
if (loader == null) {
throw new UnsupportedOperationException(
String.format("Entry '%s' requires a contextual miss
loader.", name));
}
- return loadAndTrack(key, loader);
+ return loader.apply(key);
}
+ // Track load outcomes locally because manual miss loads do not contribute
to Caffeine load statistics.
private V loadAndTrack(K key, Function<K, V> loadFunction) {
+ long startNanos = System.nanoTime();
try {
V value = loadFunction.apply(key);
+ loadSuccessCount.incrementAndGet();
+ totalLoadTimeNanos.addAndGet(System.nanoTime() - startNanos);
lastLoadSuccessTimeMs.set(System.currentTimeMillis());
return value;
} catch (RuntimeException | Error e) {
+ loadFailureCount.incrementAndGet();
+ totalLoadTimeNanos.addAndGet(System.nanoTime() - startNanos);
lastLoadFailureTimeMs.set(System.currentTimeMillis());
lastError.set(e.toString());
throw e;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/metacache/MetaCacheEntryTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/metacache/MetaCacheEntryTest.java
index 290eed874ed..7583ac3b075 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/metacache/MetaCacheEntryTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/metacache/MetaCacheEntryTest.java
@@ -17,6 +17,8 @@
package org.apache.doris.datasource.metacache;
+import org.apache.doris.common.Config;
+
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Maps;
import org.junit.Assert;
@@ -24,14 +26,19 @@ import org.junit.Test;
import java.lang.reflect.Field;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MetaCacheEntryTest {
@Test
public void testRefreshUsesConfiguredLoader() throws Exception {
+ boolean originalManualMissLoad =
Config.enable_external_meta_cache_manual_miss_load;
+ Config.enable_external_meta_cache_manual_miss_load = true;
ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
try {
Map<String, String> properties = Maps.newHashMap();
@@ -58,6 +65,7 @@ public class MetaCacheEntryTest {
}
Assert.assertTrue("refresh should trigger loader invocation",
loadCounter.get() >= 2);
} finally {
+ Config.enable_external_meta_cache_manual_miss_load =
originalManualMissLoad;
refreshExecutor.shutdownNow();
}
}
@@ -211,9 +219,196 @@ public class MetaCacheEntryTest {
}
}
+ @Test
+ public void testManualMissLoadDeduplicatesSameKey() throws Exception {
+ boolean originalManualMissLoad =
Config.enable_external_meta_cache_manual_miss_load;
+ Config.enable_external_meta_cache_manual_miss_load = true;
+ ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
+ ExecutorService queryExecutor = Executors.newFixedThreadPool(2);
+ try {
+ CacheSpec cacheSpec = CacheSpec.of(true, CacheSpec.CACHE_NO_TTL,
10L);
+ CountDownLatch loaderStarted = new CountDownLatch(1);
+ CountDownLatch releaseLoader = new CountDownLatch(1);
+ AtomicInteger loadCounter = new AtomicInteger();
+ MetaCacheEntry<String, Integer> entry = new MetaCacheEntry<>(
+ "test",
+ key -> {
+ loaderStarted.countDown();
+ awaitLatch(releaseLoader);
+ return loadCounter.incrementAndGet();
+ },
+ cacheSpec,
+ refreshExecutor,
+ false);
+
+ Future<Integer> first = queryExecutor.submit(() -> entry.get("k"));
+ Assert.assertTrue(loaderStarted.await(3L, TimeUnit.SECONDS));
+ Future<Integer> second = queryExecutor.submit(() ->
entry.get("k"));
+ releaseLoader.countDown();
+
+ Assert.assertEquals(Integer.valueOf(1), first.get(3L,
TimeUnit.SECONDS));
+ Assert.assertEquals(Integer.valueOf(1), second.get(3L,
TimeUnit.SECONDS));
+ Assert.assertEquals(1, loadCounter.get());
+ } finally {
+ Config.enable_external_meta_cache_manual_miss_load =
originalManualMissLoad;
+ queryExecutor.shutdownNow();
+ refreshExecutor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testManualMissLoadDoesNotPutAfterInvalidate() throws Exception
{
+ boolean originalManualMissLoad =
Config.enable_external_meta_cache_manual_miss_load;
+ Config.enable_external_meta_cache_manual_miss_load = true;
+ ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
+ ExecutorService queryExecutor = Executors.newSingleThreadExecutor();
+ try {
+ CacheSpec cacheSpec = CacheSpec.of(true, CacheSpec.CACHE_NO_TTL,
10L);
+ CountDownLatch loaderStarted = new CountDownLatch(1);
+ CountDownLatch releaseLoader = new CountDownLatch(1);
+ AtomicInteger loadCounter = new AtomicInteger();
+ MetaCacheEntry<String, Integer> entry = new MetaCacheEntry<>(
+ "test",
+ key -> {
+ loaderStarted.countDown();
+ awaitLatch(releaseLoader);
+ return loadCounter.incrementAndGet();
+ },
+ cacheSpec,
+ refreshExecutor,
+ false);
+
+ Future<Integer> first = queryExecutor.submit(() -> entry.get("k"));
+ Assert.assertTrue(loaderStarted.await(3L, TimeUnit.SECONDS));
+ entry.invalidateKey("k");
+ releaseLoader.countDown();
+
+ Assert.assertEquals(Integer.valueOf(1), first.get(3L,
TimeUnit.SECONDS));
+ Assert.assertNull(entry.getIfPresent("k"));
+ Assert.assertEquals(Integer.valueOf(2), entry.get("k"));
+ Assert.assertEquals(2, loadCounter.get());
+ } finally {
+ Config.enable_external_meta_cache_manual_miss_load =
originalManualMissLoad;
+ queryExecutor.shutdownNow();
+ refreshExecutor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testManualMissLoadRemovesValueWhenInvalidateHappensBeforePut()
throws Exception {
+ boolean originalManualMissLoad =
Config.enable_external_meta_cache_manual_miss_load;
+ Config.enable_external_meta_cache_manual_miss_load = true;
+ ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
+ ExecutorService queryExecutor = Executors.newSingleThreadExecutor();
+ try {
+ CacheSpec cacheSpec = CacheSpec.of(true, CacheSpec.CACHE_NO_TTL,
10L);
+ CountDownLatch beforePutStarted = new CountDownLatch(1);
+ CountDownLatch releaseBeforePut = new CountDownLatch(1);
+ AtomicInteger loadCounter = new AtomicInteger();
+ MetaCacheEntry<String, Integer> entry = new MetaCacheEntry<String,
Integer>(
+ "test",
+ key -> loadCounter.incrementAndGet(),
+ cacheSpec,
+ refreshExecutor,
+ false) {
+ @Override
+ void beforeManualCachePutForTest(String key, Integer loaded) {
+ beforePutStarted.countDown();
+ awaitLatch(releaseBeforePut);
+ }
+ };
+
+ Future<Integer> first = queryExecutor.submit(() -> entry.get("k"));
+ Assert.assertTrue(beforePutStarted.await(3L, TimeUnit.SECONDS));
+ entry.invalidateKey("k");
+ releaseBeforePut.countDown();
+
+ Assert.assertEquals(Integer.valueOf(1), first.get(3L,
TimeUnit.SECONDS));
+ Assert.assertNull(entry.getIfPresent("k"));
+ Assert.assertEquals(Integer.valueOf(2), entry.get("k"));
+ Assert.assertEquals(2, loadCounter.get());
+ } finally {
+ Config.enable_external_meta_cache_manual_miss_load =
originalManualMissLoad;
+ queryExecutor.shutdownNow();
+ refreshExecutor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testManualMissLoadAllowsNullWithoutCaching() {
+ boolean originalManualMissLoad =
Config.enable_external_meta_cache_manual_miss_load;
+ Config.enable_external_meta_cache_manual_miss_load = true;
+ ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
+ try {
+ CacheSpec cacheSpec = CacheSpec.of(true, CacheSpec.CACHE_NO_TTL,
10L);
+ MetaCacheEntry<String, Integer> entry = new MetaCacheEntry<>(
+ "test",
+ String::length,
+ cacheSpec,
+ refreshExecutor,
+ false);
+ AtomicInteger missLoaderCounter = new AtomicInteger();
+
+ // Verify manual miss load returns null directly and retries
because null values are not cached.
+ Assert.assertNull(entry.get("missing", key -> {
+ missLoaderCounter.incrementAndGet();
+ return null;
+ }));
+ Assert.assertNull(entry.getIfPresent("missing"));
+ Assert.assertNull(entry.get("missing", key -> {
+ missLoaderCounter.incrementAndGet();
+ return null;
+ }));
+ Assert.assertEquals(2, missLoaderCounter.get());
+ } finally {
+ Config.enable_external_meta_cache_manual_miss_load =
originalManualMissLoad;
+ refreshExecutor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testManualMissLoadDoesNotCacheWhenEntryDisabled() {
+ boolean originalManualMissLoad =
Config.enable_external_meta_cache_manual_miss_load;
+ Config.enable_external_meta_cache_manual_miss_load = true;
+ ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
+ try {
+ CacheSpec cacheSpec = CacheSpec.of(false, CacheSpec.CACHE_NO_TTL,
10L);
+ AtomicInteger loadCounter = new AtomicInteger();
+ MetaCacheEntry<String, Integer> entry = new MetaCacheEntry<>(
+ "test",
+ key -> loadCounter.incrementAndGet(),
+ cacheSpec,
+ refreshExecutor,
+ false);
+
+ // Verify disabled entries bypass cache entirely even when manual
miss load is enabled by config.
+ Assert.assertEquals(Integer.valueOf(1), entry.get("k"));
+ Assert.assertNull(entry.getIfPresent("k"));
+ Assert.assertEquals(Integer.valueOf(2), entry.get("k"));
+ Assert.assertNull(entry.getIfPresent("k"));
+ Assert.assertEquals(2, loadCounter.get());
+
+ entry.put("k", 100);
+ Assert.assertNull(entry.getIfPresent("k"));
+ } finally {
+ Config.enable_external_meta_cache_manual_miss_load =
originalManualMissLoad;
+ refreshExecutor.shutdownNow();
+ }
+ }
+
+ // Keep the loader blocking helper in one place so concurrent tests stay
readable.
+ private void awaitLatch(CountDownLatch latch) {
+ try {
+ Assert.assertTrue(latch.await(3L, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
@SuppressWarnings("unchecked")
private LoadingCache<String, Integer>
extractLoadingCache(MetaCacheEntry<String, Integer> entry) throws Exception {
- Field dataField = MetaCacheEntry.class.getDeclaredField("data");
+ Field dataField = MetaCacheEntry.class.getDeclaredField("loadingData");
dataField.setAccessible(true);
Object raw = dataField.get(entry);
Assert.assertTrue(raw instanceof LoadingCache);
diff --git
a/regression-test/suites/external_table_p0/jdbc/test_jdbc_refresh_catalog_manual_miss_load.groovy
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_refresh_catalog_manual_miss_load.groovy
new file mode 100644
index 00000000000..f9e942c277a
--- /dev/null
+++
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_refresh_catalog_manual_miss_load.groovy
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.UUID
+
+// This case depends on JDBC regression configs, FE debug points, and a
reachable
+// external MySQL environment. Those prerequisites may be unavailable in the
+// default Apache Doris CI pipeline, so the case can be skipped or may not run
+// end-to-end there. That is expected. The case still serves as a valuable
+// reference for manual validation of the MetaCacheEntry refresh blocking fix.
+suite("test_jdbc_refresh_catalog_manual_miss_load", "p0,external") {
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ return
+ }
+
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String mysqlPort = context.config.otherConfigs.get("mysql_57_port")
+ String s3Endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driverUrl =
"https://${bucket}.${s3Endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+ String nameSuffix = UUID.randomUUID().toString().replace("-", "")
+ String catalogName = "jdbc_manual_miss_load_" + nameSuffix
+ String remoteDbName = "jdbc_manual_miss_load_db_" +
nameSuffix.substring(0, 12)
+ String slowTableName = "slow_probe_000000"
+ int collisionTableCount = 80
+ int hashMod = 512
+ int slowSleepMs = 15000
+ int refreshDelayMs = 5000
+
+ def mysqlJdbcUrl = "jdbc:mysql://${externalEnvIp}:${mysqlPort}"
+ def configRows = sql """ADMIN SHOW FRONTEND CONFIG LIKE
'enable_external_meta_cache_manual_miss_load';"""
+ String originalManualMissLoadValue = configRows[0][1].toString()
+ def debugPointRows = sql """ADMIN SHOW FRONTEND CONFIG LIKE
'enable_debug_points';"""
+ // Skip the case when FE debug points are not enabled in the regression
environment.
+ if (!"true".equalsIgnoreCase(debugPointRows[0][1].toString())) {
+ logger.info("skip ${catalogName} because enable_debug_points is not
true")
+ return
+ }
+
+ def jint = { long value ->
+ long result = value & 0xffffffffL
+ if (result >= 0x80000000L) {
+ result -= 0x100000000L
+ }
+ return (int) result
+ }
+
+ def javaStringHash = { String value ->
+ int hash = 0
+ for (int i = 0; i < value.length(); i++) {
+ hash = jint(31L * hash + (int) value.charAt(i))
+ }
+ return hash
+ }
+
+ def javaLongHash = { long value ->
+ return jint(value ^ (value >>> 32))
+ }
+
+ def objectsHash = { List<Object> values ->
+ int hash = 1
+ values.each { value ->
+ int elementHash
+ if (value instanceof Number) {
+ elementHash = javaLongHash(((Number) value).longValue())
+ } else {
+ elementHash = javaStringHash(value.toString())
+ }
+ hash = jint(31L * hash + elementHash)
+ }
+ return hash
+ }
+
+ def spread = { int hash ->
+ return jint(hash ^ (((long) hash & 0xffffffffL) >>> 16))
+ }
+
+ // Compute collision table names that land in the same Caffeine hash bin
as the slow key.
+ def findCollisionTableNames = { long catalogId ->
+ def calcIndex = { String tableName ->
+ int hash = objectsHash([catalogId, remoteDbName, tableName,
remoteDbName, tableName])
+ return spread(hash) & (hashMod - 1)
+ }
+ int targetIndex = calcIndex(slowTableName)
+ def tableNames = []
+ for (int i = 0; i < 300000; i++) {
+ String candidate = String.format("collide_%06d", i)
+ if (calcIndex(candidate) == targetIndex) {
+ tableNames.add(candidate)
+ if (tableNames.size() >= collisionTableCount) {
+ break
+ }
+ }
+ }
+ assertEquals(collisionTableCount, tableNames.size())
+ return tableNames
+ }
+
+ def executeOnRemoteMysql = { String statement ->
+ connect("root", "123456", mysqlJdbcUrl) {
+ sql statement
+ }
+ }
+
+ // Recreate the remote schema so every run starts from the same cache and
metadata state.
+ def recreateRemoteObjects = { List<String> collisionTableNames ->
+ executeOnRemoteMysql("""DROP DATABASE IF EXISTS ${remoteDbName}""")
+ executeOnRemoteMysql("""CREATE DATABASE ${remoteDbName}""")
+ executeOnRemoteMysql("""CREATE TABLE ${remoteDbName}.${slowTableName}
(k INT)""")
+ collisionTableNames.each { tableName ->
+ executeOnRemoteMysql("""CREATE TABLE ${remoteDbName}.${tableName}
(k INT)""")
+ }
+ }
+
+ def preheatSchemaCache = { List<String> collisionTableNames ->
+ sql """REFRESH CATALOG ${catalogName}"""
+ collisionTableNames.each { tableName ->
+ sql """DESC ${catalogName}.${remoteDbName}.${tableName}"""
+ }
+ assertEquals(collisionTableCount, getSchemaCacheSize())
+ }
+
+ // Read the current schema cache size for this catalog from
information_schema.
+ def getSchemaCacheSize = {
+ def statRows = sql """
+ SELECT ESTIMATED_SIZE
+ FROM information_schema.catalog_meta_cache_statistics
+ WHERE CATALOG_NAME = '${catalogName}'
+ AND ENTRY_NAME = 'schema'
+ """
+ return ((Number) statRows[0][0]).intValue()
+ }
+
+ // Run the blocking reproduction once and assert the refresh latency
profile.
+ def runRefreshRace = { boolean manualMissLoadEnabled, long minRefreshMs,
long maxRefreshMs ->
+ sql """ADMIN SET FRONTEND CONFIG
('enable_external_meta_cache_manual_miss_load' = '${manualMissLoadEnabled}')"""
+ try {
+ GetDebugPoint().enableDebugPointForAllFEs(
+ "PluginDrivenExternalTable.initSchema.sleep",
+ ["sleepMs": String.valueOf(slowSleepMs)])
+
+ def descElapsedMs = -1L
+ def descFailure = null
+ def descThread = Thread.start {
+ try {
+ long descStart = System.currentTimeMillis()
+ connect(context.config.jdbcUser,
context.config.jdbcPassword, context.config.jdbcUrl) {
+ sql """DESC
${catalogName}.${remoteDbName}.${slowTableName}"""
+ }
+ descElapsedMs = System.currentTimeMillis() - descStart
+ } catch (Throwable t) {
+ descFailure = t
+ }
+ }
+
+ // Delay the refresh long enough so the DESC path can enter the
injected slow schema load.
+ Thread.sleep(refreshDelayMs)
+
+ long refreshStart = System.currentTimeMillis()
+ sql """REFRESH CATALOG ${catalogName}"""
+ long refreshElapsedMs = System.currentTimeMillis() - refreshStart
+
+ descThread.join(slowSleepMs + 15000)
+ if (descThread.isAlive()) {
+ throw new IllegalStateException("desc thread does not finish
in time")
+ }
+ if (descFailure != null) {
+ throw new IllegalStateException("desc thread failed",
descFailure)
+ }
+
+ logger.info("manualMissLoadEnabled=${manualMissLoadEnabled},
refreshElapsedMs=${refreshElapsedMs}, descElapsedMs=${descElapsedMs}")
+ assertTrue(descElapsedMs >= slowSleepMs - 1000)
+ assertTrue(refreshElapsedMs >= minRefreshMs)
+ assertTrue(refreshElapsedMs <= maxRefreshMs)
+ assertTrue(refreshElapsedMs < descElapsedMs)
+ return [refreshElapsedMs, descElapsedMs]
+ } finally {
+
GetDebugPoint().disableDebugPointForAllFEs("PluginDrivenExternalTable.initSchema.sleep")
+ }
+ }
+
+ try {
+ executeOnRemoteMysql("""DROP DATABASE IF EXISTS ${remoteDbName}""")
+ executeOnRemoteMysql("""CREATE DATABASE ${remoteDbName}""")
+ sql """DROP CATALOG IF EXISTS ${catalogName}"""
+ sql """CREATE CATALOG ${catalogName} PROPERTIES(
+ "type" = "jdbc",
+ "user" = "root",
+ "password" = "123456",
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysqlPort}/${remoteDbName}?useSSL=false",
+ "driver_url" = "${driverUrl}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "only_specified_database" = "true",
+ "include_database_list" = "${remoteDbName}"
+ )"""
+
+ def catalogRows = sql """SHOW CATALOGS LIKE '${catalogName}'"""
+ long catalogId = ((Number) catalogRows[0][0]).longValue()
+ def collisionTableNames = findCollisionTableNames(catalogId)
+
+ recreateRemoteObjects(collisionTableNames)
+ preheatSchemaCache(collisionTableNames)
+ def blockedResult = runRefreshRace(false, 8000L, slowSleepMs + 5000L)
+
+ recreateRemoteObjects(collisionTableNames)
+ preheatSchemaCache(collisionTableNames)
+ def manualLoadResult = runRefreshRace(true, 0L, 5000L)
+
+ // Verify the invalidated slow load did not write back the stale key
into schema cache.
+ assertEquals(0, getSchemaCacheSize())
+ sql """DESC ${catalogName}.${remoteDbName}.${slowTableName}"""
+ assertEquals(1, getSchemaCacheSize())
+ assertTrue(((Number) blockedResult[0]).longValue() > ((Number)
manualLoadResult[0]).longValue() + 5000L)
+ } finally {
+ try_sql("""ADMIN SET FRONTEND CONFIG
('enable_external_meta_cache_manual_miss_load' =
'${originalManualMissLoadValue}')""")
+ try {
+
GetDebugPoint().disableDebugPointForAllFEs("PluginDrivenExternalTable.initSchema.sleep")
+ } catch (Throwable t) {
+ logger.warn("failed to disable debug point during cleanup", t)
+ }
+ try_sql("""DROP CATALOG IF EXISTS ${catalogName}""")
+ try {
+ executeOnRemoteMysql("""DROP DATABASE IF EXISTS ${remoteDbName}""")
+ } catch (Throwable t) {
+ logger.warn("failed to drop remote database during cleanup", t)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]