This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new ba921e343d9 branch-4.1: [refact](udf) remove the udf cache 
expiration_time ‌property‌ #63897 (#63991)
ba921e343d9 is described below

commit ba921e343d981ebe371b45eed3d2ffb3499d7f70
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 2 19:06:39 2026 +0800

    branch-4.1: [refact](udf) remove the udf cache expiration_time ‌property‌ 
#63897 (#63991)
    
    Cherry-picked from #63897
    
    Co-authored-by: zhangstar333 <[email protected]>
---
 .../doris/common/classloader/ScannerLoader.java    |  72 +++++++++++++--
 .../apache/doris/common/jni/utils/ExpiringMap.java | 100 ---------------------
 .../java/org/apache/doris/udf/BaseExecutor.java    |  17 +++-
 3 files changed, 78 insertions(+), 111 deletions(-)

diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
index 1c5874031d4..f8a119efaa9 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.common.classloader;
 
-import org.apache.doris.common.jni.utils.ExpiringMap;
 import org.apache.doris.common.jni.utils.UdfClassCache;
 
 import com.google.common.collect.Streams;
@@ -37,6 +36,7 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.stream.Collectors;
@@ -88,7 +88,21 @@ public class ScannerLoader {
 
     public static final Logger LOG = LogManager.getLogger(ScannerLoader.class);
     private static final Map<String, Class<?>> loadedClasses = new HashMap<>();
-    private static final ExpiringMap<String, UdfClassCache> udfLoadedClasses = 
new ExpiringMap<>();
+    // Cache of UDF class metadata (including the URLClassLoader used to load 
the UDF).
+    // Entries are inserted on first use and only ever removed by an explicit
+    // cleanUdfClassLoader() call (triggered by FE on DROP FUNCTION). There is 
intentionally
+    // no time-based eviction: that previously caused two issues —
+    //   1) closing a URLClassLoader while another thread was still loading 
classes from it
+    //      led to NoClassDefFoundError;
+    //   2) rebuilding a fresh URLClassLoader on every eviction produced 
multiple coexisting
+    //      ClassLoaders for the same UDF, which broke lazy class resolution 
and reflective
+    //      lookups inside user UDF code.
+    // NOTE: a cache miss in BaseExecutor.getClassCache() is NOT only 
reachable after
+    // cleanUdfClassLoader() — concurrent first-time loads of the same 
signature can also
+    // both observe a miss. cacheClassLoader() must therefore insert 
atomically via
+    // putIfAbsent and must never close a cache that was already published to 
the map,
+    // because another executor may already be holding it.
+    private static final Map<String, UdfClassCache> udfLoadedClasses = new 
ConcurrentHashMap<>();
     private static final String CLASS_SUFFIX = ".class";
     private static final String LOAD_PACKAGE = "org.apache.doris";
 
@@ -116,15 +130,57 @@ public class ScannerLoader {
         return udfLoadedClasses.get(functionSignature);
     }
 
-    public static synchronized void cacheClassLoader(String functionSignature, 
UdfClassCache classCache,
+    /**
+     * Cache the UDF class metadata for the given function signature.
+     *
+     * <p>Insertion is atomic via {@link Map#putIfAbsent}: if another executor 
thread has
+     * already published a cache entry for {@code functionSignature}, the 
{@code classCache}
+     * argument is treated as a redundant build and closed here (it has not 
yet been handed
+     * to any executor, so closing its URLClassLoader is safe). The 
already-published entry
+     * is returned to the caller so the current executor can switch to it.</p>
+     *
+     * <p>The {@code expirationTime} parameter is kept for backward 
compatibility with the
+     * existing call sites and DDL property {@code expiration_time}, but is no 
longer used:
+     * cached entries are not evicted by time. Removal happens only via
+     * {@link #cleanUdfClassLoader(String)} on DROP FUNCTION.</p>
+     *
+     * @return the {@link UdfClassCache} actually held in the map after this 
call —
+     *         either {@code classCache} (we won the race) or the pre-existing 
entry
+     *         (another thread won; {@code classCache} has been closed and 
must not be used).
+     */
+    public static UdfClassCache cacheClassLoader(String functionSignature, 
UdfClassCache classCache,
             long expirationTime) {
-        LOG.info("Cache UDF for: {}", functionSignature);
-        udfLoadedClasses.put(functionSignature, classCache, expirationTime * 
60 * 1000L);
+        LOG.info("Cache UDF for: " + functionSignature);
+        UdfClassCache existing = 
udfLoadedClasses.putIfAbsent(functionSignature, classCache);
+        if (existing == null) {
+            return classCache;
+        }
+        // Lost the race against a concurrent first-time load. The cache we 
just built has
+        // never been exposed to any executor, so closing its URLClassLoader 
here cannot
+        // affect anyone. Do NOT touch `existing` — another executor may 
already be using it.
+        try {
+            classCache.close();
+        } catch (Exception e) {
+            LOG.warn("Failed to close redundant UdfClassCache for " + 
functionSignature, e);
+        }
+        return existing;
     }
 
-    public synchronized void cleanUdfClassLoader(String functionSignature) {
-        LOG.info("cleanUdfClassLoader for: {}", functionSignature);
-        udfLoadedClasses.remove(functionSignature);
+    public void cleanUdfClassLoader(String functionSignature) {
+        LOG.info("cleanUdfClassLoader for: " + functionSignature);
+        UdfClassCache removed = udfLoadedClasses.remove(functionSignature);
+        if (removed != null) {
+            // Immediately close the URLClassLoader. NOTE: any in-flight query 
still holding a
+            // reference to this cache (e.g. via JNIContext.executor) will 
fail with
+            // NoClassDefFoundError on lazy class resolution after this point. 
This is the
+            // accepted semantic of DROP FUNCTION: the function is gone, 
queries against it
+            // are expected to fail.
+            try {
+                removed.close();
+            } catch (Exception e) {
+                LOG.warn("Failed to close UdfClassCache for " + 
functionSignature, e);
+            }
+        }
     }
 
     /**
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
deleted file mode 100644
index 3496d5bbb63..00000000000
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
+++ /dev/null
@@ -1,100 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.jni.utils;
-
-import org.apache.log4j.Logger;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class ExpiringMap<K, V> {
-    private final ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>(); // 
key --> value
-    private final ConcurrentHashMap<K, Long> ttlMap = new 
ConcurrentHashMap<>(); // key --> ttl interval
-    // key --> expirationTime(ttl interval + currentTimeMillis)
-    private final ConcurrentHashMap<K, Long> expirationMap = new 
ConcurrentHashMap<>();
-    private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
-    private static final long DEFAULT_INTERVAL_TIME = 10 * 60 * 1000L; // 10 
minutes
-    public static final Logger LOG = Logger.getLogger(ExpiringMap.class);
-
-    public ExpiringMap() {
-        startExpirationTask();
-    }
-
-    public void put(K key, V value, long expirationTimeMs) {
-        long expirationTime = System.currentTimeMillis() + expirationTimeMs;
-        map.put(key, value);
-        expirationMap.put(key, expirationTime);
-        ttlMap.put(key, expirationTimeMs);
-    }
-
-    public V get(K key) {
-        Long expirationTime = expirationMap.get(key);
-        if (expirationTime == null || System.currentTimeMillis() > 
expirationTime) {
-            remove(key);
-            return null;
-        }
-        // reset time again
-        long ttl = ttlMap.get(key);
-        long newExpirationTime = System.currentTimeMillis() + ttl;
-        expirationMap.put(key, newExpirationTime);
-        return map.get(key);
-    }
-
-    private void startExpirationTask() {
-        scheduler.scheduleAtFixedRate(() -> {
-            long now = System.currentTimeMillis();
-            for (K key : expirationMap.keySet()) {
-                if (expirationMap.get(key) <= now) {
-                    remove(key);
-                }
-            }
-        }, DEFAULT_INTERVAL_TIME, DEFAULT_INTERVAL_TIME, TimeUnit.MINUTES);
-    }
-
-    public void remove(K key) {
-        V value = map.remove(key);
-        expirationMap.remove(key);
-        ttlMap.remove(key);
-
-        // Uniformly release resources for any AutoCloseable value,
-        if (value instanceof AutoCloseable) {
-            try {
-                ((AutoCloseable) value).close();
-            } catch (Exception e) {
-                LOG.warn("Failed to close cached resource: " + key, e);
-            }
-        }
-    }
-
-    public int size() {
-        return map.size();
-    }
-
-    public void shutdown() {
-        scheduler.shutdown();
-        try {
-            if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
-                scheduler.shutdownNow();
-            }
-        } catch (InterruptedException e) {
-            scheduler.shutdownNow();
-        }
-    }
-}
diff --git 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
index 0967e9fde0b..6356576baaf 100644
--- 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
+++ 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
@@ -138,8 +138,12 @@ public abstract class BaseExecutor {
         UdfClassCache cache = null;
         if (isStaticLoad) {
             cache = ScannerLoader.getUdfClassLoader(signature);
-            if (cache != null && cache.classLoader != null) {
-                // Reuse the cached classLoader to ensure dependent classes 
can be loaded
+            if (cache != null) {
+                // Reuse the cached classLoader to ensure dependent classes 
can be loaded.
+                // NOTE: cache.classLoader may be null when the UDF was 
originally loaded via
+                // the system class loader (jarPath empty / custom_lib UDF); 
see
+                // UdfClassCache#classLoader. A null value here is a valid 
cached state and
+                // must NOT trigger a rebuild — only an actual cache miss does.
                 classLoader = cache.classLoader;
             }
         }
@@ -162,7 +166,14 @@ public abstract class BaseExecutor {
             cache.classLoader = classLoader;
             checkAndCacheUdfClass(cache, funcRetType, parameterTypes);
             if (isStaticLoad) {
-                ScannerLoader.cacheClassLoader(signature, cache, 
expirationTime);
+                UdfClassCache effective = 
ScannerLoader.cacheClassLoader(signature, cache, expirationTime);
+                if (effective != cache) {
+                    // Another thread won the publish race. Our locally-built 
cache (and its
+                    // URLClassLoader) was already closed inside 
cacheClassLoader(); switch to
+                    // the published one so we share its live classLoader.
+                    cache = effective;
+                    classLoader = cache.classLoader;
+                }
             }
         }
         return cache;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to