This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d77df4f60 [core] Fix 'Trying to access closed classloader' in Flink 
(#4116)
d77df4f60 is described below

commit d77df4f6000d9b6f460d84fb3ae482b9fc3c583b
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 4 13:25:47 2024 +0800

    [core] Fix 'Trying to access closed classloader' in Flink (#4116)
---
 .../java/org/apache/paimon/utils/ThreadPoolUtils.java    | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
index 6111b0e0f..02b5d73fc 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
@@ -120,8 +120,14 @@ public class ThreadPoolUtils {
     public static <U> void randomlyOnlyExecute(
             ExecutorService executor, Consumer<U> processor, Collection<U> 
input) {
         List<Future<?>> futures = new ArrayList<>(input.size());
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
         for (U u : input) {
-            futures.add(executor.submit(() -> processor.accept(u)));
+            futures.add(
+                    executor.submit(
+                            () -> {
+                                
Thread.currentThread().setContextClassLoader(cl);
+                                processor.accept(u);
+                            }));
         }
         awaitAllFutures(futures);
     }
@@ -129,8 +135,14 @@ public class ThreadPoolUtils {
     public static <U, T> Iterator<T> randomlyExecute(
             ExecutorService executor, Function<U, List<T>> processor, 
Collection<U> input) {
         List<Future<List<T>>> futures = new ArrayList<>(input.size());
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
         for (U u : input) {
-            futures.add(executor.submit(() -> processor.apply(u)));
+            futures.add(
+                    executor.submit(
+                            () -> {
+                                
Thread.currentThread().setContextClassLoader(cl);
+                                return processor.apply(u);
+                            }));
         }
         return futuresToIterIter(futures);
     }

Reply via email to