This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 58af3a0129 [core] Limit max task number in ManifestReadThreadPool 
(#5737)
58af3a0129 is described below

commit 58af3a0129bc55918b9bd1ad55f703ecc0c37f60
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Jun 14 08:57:37 2025 +0800

    [core] Limit max task number in ManifestReadThreadPool (#5737)
---
 .../org/apache/paimon/utils/ThreadPoolUtils.java   |  16 +-
 .../paimon/utils/SemaphoredDelegatingExecutor.java | 184 +++++++++++++++++++++
 .../paimon/utils/ManifestReadThreadPool.java       |  27 ++-
 3 files changed, 205 insertions(+), 22 deletions(-)

diff --git 
a/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java 
b/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
index a4790583c5..7ad3425367 100644
--- a/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
+++ b/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
@@ -21,8 +21,6 @@ package org.apache.paimon.utils;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -36,8 +34,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -78,20 +74,12 @@ public class ThreadPoolUtils {
         return executor;
     }
 
-    public static ScheduledExecutorService createScheduledThreadPool(
-            int threadNum, String namePrefix) {
-        return new ScheduledThreadPoolExecutor(threadNum, 
newDaemonThreadFactory(namePrefix));
-    }
-
     /** This method aims to parallel process tasks with memory control and 
sequentially. */
     public static <T, U> Iterable<T> sequentialBatchedExecute(
-            ThreadPoolExecutor executor,
+            ExecutorService executor,
             Function<U, List<T>> processor,
             List<U> input,
-            @Nullable Integer queueSize) {
-        if (queueSize == null) {
-            queueSize = executor.getMaximumPoolSize();
-        }
+            int queueSize) {
         if (queueSize <= 0) {
             throw new NegativeArraySizeException("queue size should not be 
negative");
         }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/SemaphoredDelegatingExecutor.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/SemaphoredDelegatingExecutor.java
new file mode 100644
index 0000000000..bdbb23796b
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/SemaphoredDelegatingExecutor.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.ForwardingExecutorService;
+import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.Futures;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link ForwardingExecutorService} to delegate tasks to limit the number 
of tasks executed
+ * concurrently.
+ */
+public class SemaphoredDelegatingExecutor extends ForwardingExecutorService {
+
+    private final Semaphore queueingPermits;
+    private final ExecutorService executorDelegated;
+    private final int permitCount;
+
+    public SemaphoredDelegatingExecutor(
+            ExecutorService executorDelegated, int permitCount, boolean fair) {
+        this.permitCount = permitCount;
+        this.queueingPermits = new Semaphore(permitCount, fair);
+        this.executorDelegated = executorDelegated;
+    }
+
+    @Override
+    protected ExecutorService delegate() {
+        return this.executorDelegated;
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(
+            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit 
unit) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        try {
+            this.queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return Futures.immediateFailedFuture(e);
+        }
+
+        return super.submit(new CallableWithPermitRelease(task));
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        try {
+            this.queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return Futures.immediateFailedFuture(e);
+        }
+
+        return super.submit(new RunnableWithPermitRelease(task), result);
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        try {
+            this.queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return Futures.immediateFailedFuture(e);
+        }
+
+        return super.submit(new RunnableWithPermitRelease(task));
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        try {
+            this.queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        super.execute(new RunnableWithPermitRelease(command));
+    }
+
+    public int getAvailablePermits() {
+        return this.queueingPermits.availablePermits();
+    }
+
+    public int getWaitingCount() {
+        return this.queueingPermits.getQueueLength();
+    }
+
+    public int getPermitCount() {
+        return this.permitCount;
+    }
+
+    @Override
+    public String toString() {
+        return "SemaphoredDelegatingExecutor{"
+                + "permitCount="
+                + getPermitCount()
+                + ", available="
+                + getAvailablePermits()
+                + ", waiting="
+                + getWaitingCount()
+                + '}';
+    }
+
+    private class RunnableWithPermitRelease implements Runnable {
+
+        private final Runnable delegated;
+
+        RunnableWithPermitRelease(Runnable delegated) {
+            this.delegated = delegated;
+        }
+
+        @Override
+        public void run() {
+            try {
+                this.delegated.run();
+            } finally {
+                SemaphoredDelegatingExecutor.this.queueingPermits.release();
+            }
+        }
+    }
+
+    private class CallableWithPermitRelease<T> implements Callable<T> {
+
+        private final Callable<T> delegated;
+
+        CallableWithPermitRelease(Callable<T> delegated) {
+            this.delegated = delegated;
+        }
+
+        @Override
+        public T call() throws Exception {
+            T result;
+            try {
+                result = this.delegated.call();
+            } finally {
+                SemaphoredDelegatingExecutor.this.queueingPermits.release();
+            }
+
+            return result;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
index 49fcfc8bd9..16bd73ee80 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
@@ -22,6 +22,7 @@ import javax.annotation.Nullable;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.function.Function;
 
@@ -35,28 +36,38 @@ public class ManifestReadThreadPool {
     private static ThreadPoolExecutor executorService =
             createCachedThreadPool(Runtime.getRuntime().availableProcessors(), 
THREAD_NAME);
 
-    public static synchronized ThreadPoolExecutor getExecutorService(@Nullable 
Integer threadNum) {
-        if (threadNum == null || threadNum <= 
executorService.getMaximumPoolSize()) {
+    public static synchronized ExecutorService getExecutorService(@Nullable 
Integer threadNum) {
+        if (threadNum == null || threadNum == 
executorService.getMaximumPoolSize()) {
             return executorService;
         }
-        // we don't need to close previous pool
-        // it is just cached pool
-        executorService = createCachedThreadPool(threadNum, THREAD_NAME);
+        if (threadNum < executorService.getMaximumPoolSize()) {
+            return new SemaphoredDelegatingExecutor(executorService, 
threadNum, false);
+        } else {
+            // we don't need to close previous pool
+            // it is just cached pool
+            executorService = createCachedThreadPool(threadNum, THREAD_NAME);
 
-        return executorService;
+            return executorService;
+        }
     }
 
     /** This method aims to parallel process tasks with memory control and 
sequentially. */
     public static <T, U> Iterable<T> sequentialBatchedExecute(
             Function<U, List<T>> processor, List<U> input, @Nullable Integer 
threadNum) {
-        ThreadPoolExecutor executor = getExecutorService(threadNum);
+        ExecutorService executor = getExecutorService(threadNum);
+        if (threadNum == null) {
+            threadNum =
+                    executor instanceof ThreadPoolExecutor
+                            ? ((ThreadPoolExecutor) 
executor).getMaximumPoolSize()
+                            : ((SemaphoredDelegatingExecutor) 
executor).getPermitCount();
+        }
         return ThreadPoolUtils.sequentialBatchedExecute(executor, processor, 
input, threadNum);
     }
 
     /** This method aims to parallel process tasks with randomly but return 
values sequentially. */
     public static <T, U> Iterator<T> randomlyExecuteSequentialReturn(
             Function<U, List<T>> processor, List<U> input, @Nullable Integer 
threadNum) {
-        ThreadPoolExecutor executor = getExecutorService(threadNum);
+        ExecutorService executor = getExecutorService(threadNum);
         return ThreadPoolUtils.randomlyExecuteSequentialReturn(executor, 
processor, input);
     }
 }

Reply via email to