This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new ba921e343d9 branch-4.1: [refact](udf) remove the udf cache
expiration_time property #63897 (#63991)
ba921e343d9 is described below
commit ba921e343d981ebe371b45eed3d2ffb3499d7f70
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 2 19:06:39 2026 +0800
branch-4.1: [refact](udf) remove the udf cache expiration_time property
#63897 (#63991)
Cherry-picked from #63897
Co-authored-by: zhangstar333 <[email protected]>
---
.../doris/common/classloader/ScannerLoader.java | 72 +++++++++++++--
.../apache/doris/common/jni/utils/ExpiringMap.java | 100 ---------------------
.../java/org/apache/doris/udf/BaseExecutor.java | 17 +++-
3 files changed, 78 insertions(+), 111 deletions(-)
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
index 1c5874031d4..f8a119efaa9 100644
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
@@ -17,7 +17,6 @@
package org.apache.doris.common.classloader;
-import org.apache.doris.common.jni.utils.ExpiringMap;
import org.apache.doris.common.jni.utils.UdfClassCache;
import com.google.common.collect.Streams;
@@ -37,6 +36,7 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.stream.Collectors;
@@ -88,7 +88,21 @@ public class ScannerLoader {
public static final Logger LOG = LogManager.getLogger(ScannerLoader.class);
private static final Map<String, Class<?>> loadedClasses = new HashMap<>();
- private static final ExpiringMap<String, UdfClassCache> udfLoadedClasses =
new ExpiringMap<>();
+ // Cache of UDF class metadata (including the URLClassLoader used to load
the UDF).
+ // Entries are inserted on first use and only ever removed by an explicit
+ // cleanUdfClassLoader() call (triggered by FE on DROP FUNCTION). There is
intentionally
+ // no time-based eviction: that previously caused two issues —
+ // 1) closing a URLClassLoader while another thread was still loading
classes from it
+ // led to NoClassDefFoundError;
+ // 2) rebuilding a fresh URLClassLoader on every eviction produced
multiple coexisting
+ // ClassLoaders for the same UDF, which broke lazy class resolution
and reflective
+ // lookups inside user UDF code.
+ // NOTE: a cache miss in BaseExecutor.getClassCache() is NOT only
reachable after
+ // cleanUdfClassLoader() — concurrent first-time loads of the same
signature can also
+ // both observe a miss. cacheClassLoader() must therefore insert
atomically via
+ // putIfAbsent and must never close a cache that was already published to
the map,
+ // because another executor may already be holding it.
+ private static final Map<String, UdfClassCache> udfLoadedClasses = new
ConcurrentHashMap<>();
private static final String CLASS_SUFFIX = ".class";
private static final String LOAD_PACKAGE = "org.apache.doris";
@@ -116,15 +130,57 @@ public class ScannerLoader {
return udfLoadedClasses.get(functionSignature);
}
- public static synchronized void cacheClassLoader(String functionSignature,
UdfClassCache classCache,
+ /**
+ * Cache the UDF class metadata for the given function signature.
+ *
+ * <p>Insertion is atomic via {@link Map#putIfAbsent}: if another executor
thread has
+ * already published a cache entry for {@code functionSignature}, the
{@code classCache}
+ * argument is treated as a redundant build and closed here (it has not
yet been handed
+ * to any executor, so closing its URLClassLoader is safe). The
already-published entry
+ * is returned to the caller so the current executor can switch to it.</p>
+ *
+ * <p>The {@code expirationTime} parameter is kept for backward
compatibility with the
+ * existing call sites and DDL property {@code expiration_time}, but is no
longer used:
+ * cached entries are not evicted by time. Removal happens only via
+ * {@link #cleanUdfClassLoader(String)} on DROP FUNCTION.</p>
+ *
+ * @return the {@link UdfClassCache} actually held in the map after this
call —
+ * either {@code classCache} (we won the race) or the pre-existing
entry
+ * (another thread won; {@code classCache} has been closed and
must not be used).
+ */
+ public static UdfClassCache cacheClassLoader(String functionSignature,
UdfClassCache classCache,
long expirationTime) {
- LOG.info("Cache UDF for: {}", functionSignature);
- udfLoadedClasses.put(functionSignature, classCache, expirationTime *
60 * 1000L);
+ LOG.info("Cache UDF for: " + functionSignature);
+ UdfClassCache existing =
udfLoadedClasses.putIfAbsent(functionSignature, classCache);
+ if (existing == null) {
+ return classCache;
+ }
+ // Lost the race against a concurrent first-time load. The cache we
just built has
+ // never been exposed to any executor, so closing its URLClassLoader
here cannot
+ // affect anyone. Do NOT touch `existing` — another executor may
already be using it.
+ try {
+ classCache.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close redundant UdfClassCache for " +
functionSignature, e);
+ }
+ return existing;
}
- public synchronized void cleanUdfClassLoader(String functionSignature) {
- LOG.info("cleanUdfClassLoader for: {}", functionSignature);
- udfLoadedClasses.remove(functionSignature);
+ public void cleanUdfClassLoader(String functionSignature) {
+ LOG.info("cleanUdfClassLoader for: " + functionSignature);
+ UdfClassCache removed = udfLoadedClasses.remove(functionSignature);
+ if (removed != null) {
+ // Immediately close the URLClassLoader. NOTE: any in-flight query
still holding a
+ // reference to this cache (e.g. via JNIContext.executor) will
fail with
+ // NoClassDefFoundError on lazy class resolution after this point.
This is the
+ // accepted semantic of DROP FUNCTION: the function is gone,
queries against it
+ // are expected to fail.
+ try {
+ removed.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close UdfClassCache for " +
functionSignature, e);
+ }
+ }
}
/**
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
deleted file mode 100644
index 3496d5bbb63..00000000000
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
+++ /dev/null
@@ -1,100 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.jni.utils;
-
-import org.apache.log4j.Logger;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class ExpiringMap<K, V> {
- private final ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>(); //
key --> value
- private final ConcurrentHashMap<K, Long> ttlMap = new
ConcurrentHashMap<>(); // key --> ttl interval
- // key --> expirationTime(ttl interval + currentTimeMillis)
- private final ConcurrentHashMap<K, Long> expirationMap = new
ConcurrentHashMap<>();
- private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
- private static final long DEFAULT_INTERVAL_TIME = 10 * 60 * 1000L; // 10
minutes
- public static final Logger LOG = Logger.getLogger(ExpiringMap.class);
-
- public ExpiringMap() {
- startExpirationTask();
- }
-
- public void put(K key, V value, long expirationTimeMs) {
- long expirationTime = System.currentTimeMillis() + expirationTimeMs;
- map.put(key, value);
- expirationMap.put(key, expirationTime);
- ttlMap.put(key, expirationTimeMs);
- }
-
- public V get(K key) {
- Long expirationTime = expirationMap.get(key);
- if (expirationTime == null || System.currentTimeMillis() >
expirationTime) {
- remove(key);
- return null;
- }
- // reset time again
- long ttl = ttlMap.get(key);
- long newExpirationTime = System.currentTimeMillis() + ttl;
- expirationMap.put(key, newExpirationTime);
- return map.get(key);
- }
-
- private void startExpirationTask() {
- scheduler.scheduleAtFixedRate(() -> {
- long now = System.currentTimeMillis();
- for (K key : expirationMap.keySet()) {
- if (expirationMap.get(key) <= now) {
- remove(key);
- }
- }
- }, DEFAULT_INTERVAL_TIME, DEFAULT_INTERVAL_TIME, TimeUnit.MINUTES);
- }
-
- public void remove(K key) {
- V value = map.remove(key);
- expirationMap.remove(key);
- ttlMap.remove(key);
-
- // Uniformly release resources for any AutoCloseable value,
- if (value instanceof AutoCloseable) {
- try {
- ((AutoCloseable) value).close();
- } catch (Exception e) {
- LOG.warn("Failed to close cached resource: " + key, e);
- }
- }
- }
-
- public int size() {
- return map.size();
- }
-
- public void shutdown() {
- scheduler.shutdown();
- try {
- if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
- scheduler.shutdownNow();
- }
- } catch (InterruptedException e) {
- scheduler.shutdownNow();
- }
- }
-}
diff --git
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
index 0967e9fde0b..6356576baaf 100644
---
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
+++
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
@@ -138,8 +138,12 @@ public abstract class BaseExecutor {
UdfClassCache cache = null;
if (isStaticLoad) {
cache = ScannerLoader.getUdfClassLoader(signature);
- if (cache != null && cache.classLoader != null) {
- // Reuse the cached classLoader to ensure dependent classes
can be loaded
+ if (cache != null) {
+ // Reuse the cached classLoader to ensure dependent classes
can be loaded.
+ // NOTE: cache.classLoader may be null when the UDF was
originally loaded via
+ // the system class loader (jarPath empty / custom_lib UDF);
see
+ // UdfClassCache#classLoader. A null value here is a valid
cached state and
+ // must NOT trigger a rebuild — only an actual cache miss does.
classLoader = cache.classLoader;
}
}
@@ -162,7 +166,14 @@ public abstract class BaseExecutor {
cache.classLoader = classLoader;
checkAndCacheUdfClass(cache, funcRetType, parameterTypes);
if (isStaticLoad) {
- ScannerLoader.cacheClassLoader(signature, cache,
expirationTime);
+ UdfClassCache effective =
ScannerLoader.cacheClassLoader(signature, cache, expirationTime);
+ if (effective != cache) {
+ // Another thread won the publish race. Our locally-built
cache (and its
+ // URLClassLoader) was already closed inside
cacheClassLoader(); switch to
+ // the published one so we share its live classLoader.
+ cache = effective;
+ classLoader = cache.classLoader;
+ }
}
}
return cache;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]