This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 58af3a0129 [core] Limit max task number in ManifestReadThreadPool
(#5737)
58af3a0129 is described below
commit 58af3a0129bc55918b9bd1ad55f703ecc0c37f60
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Jun 14 08:57:37 2025 +0800
[core] Limit max task number in ManifestReadThreadPool (#5737)
---
.../org/apache/paimon/utils/ThreadPoolUtils.java | 16 +-
.../paimon/utils/SemaphoredDelegatingExecutor.java | 184 +++++++++++++++++++++
.../paimon/utils/ManifestReadThreadPool.java | 27 ++-
3 files changed, 205 insertions(+), 22 deletions(-)
diff --git
a/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
b/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
index a4790583c5..7ad3425367 100644
--- a/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
+++ b/paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
@@ -21,8 +21,6 @@ package org.apache.paimon.utils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-import javax.annotation.Nullable;
-
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -36,8 +34,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -78,20 +74,12 @@ public class ThreadPoolUtils {
return executor;
}
- public static ScheduledExecutorService createScheduledThreadPool(
- int threadNum, String namePrefix) {
- return new ScheduledThreadPoolExecutor(threadNum,
newDaemonThreadFactory(namePrefix));
- }
-
/** This method aims to parallel process tasks with memory control and
sequentially. */
public static <T, U> Iterable<T> sequentialBatchedExecute(
- ThreadPoolExecutor executor,
+ ExecutorService executor,
Function<U, List<T>> processor,
List<U> input,
- @Nullable Integer queueSize) {
- if (queueSize == null) {
- queueSize = executor.getMaximumPoolSize();
- }
+ int queueSize) {
if (queueSize <= 0) {
throw new NegativeArraySizeException("queue size should not be
negative");
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/SemaphoredDelegatingExecutor.java
b/paimon-common/src/main/java/org/apache/paimon/utils/SemaphoredDelegatingExecutor.java
new file mode 100644
index 0000000000..bdbb23796b
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/SemaphoredDelegatingExecutor.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.ForwardingExecutorService;
+import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.Futures;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link ForwardingExecutorService} to delegate tasks to limit the number
of tasks executed
+ * concurrently.
+ */
+public class SemaphoredDelegatingExecutor extends ForwardingExecutorService {
+
+ private final Semaphore queueingPermits;
+ private final ExecutorService executorDelegated;
+ private final int permitCount;
+
+ public SemaphoredDelegatingExecutor(
+ ExecutorService executorDelegated, int permitCount, boolean fair) {
+ this.permitCount = permitCount;
+ this.queueingPermits = new Semaphore(permitCount, fair);
+ this.executorDelegated = executorDelegated;
+ }
+
+ @Override
+ protected ExecutorService delegate() {
+ return this.executorDelegated;
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit
unit) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ try {
+ this.queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedFuture(e);
+ }
+
+ return super.submit(new CallableWithPermitRelease(task));
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ try {
+ this.queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedFuture(e);
+ }
+
+ return super.submit(new RunnableWithPermitRelease(task), result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ try {
+ this.queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedFuture(e);
+ }
+
+ return super.submit(new RunnableWithPermitRelease(task));
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ try {
+ this.queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ super.execute(new RunnableWithPermitRelease(command));
+ }
+
+ public int getAvailablePermits() {
+ return this.queueingPermits.availablePermits();
+ }
+
+ public int getWaitingCount() {
+ return this.queueingPermits.getQueueLength();
+ }
+
+ public int getPermitCount() {
+ return this.permitCount;
+ }
+
+ @Override
+ public String toString() {
+ return "SemaphoredDelegatingExecutor{"
+ + "permitCount="
+ + getPermitCount()
+ + ", available="
+ + getAvailablePermits()
+ + ", waiting="
+ + getWaitingCount()
+ + '}';
+ }
+
+ private class RunnableWithPermitRelease implements Runnable {
+
+ private final Runnable delegated;
+
+ RunnableWithPermitRelease(Runnable delegated) {
+ this.delegated = delegated;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.delegated.run();
+ } finally {
+ SemaphoredDelegatingExecutor.this.queueingPermits.release();
+ }
+ }
+ }
+
+ private class CallableWithPermitRelease<T> implements Callable<T> {
+
+ private final Callable<T> delegated;
+
+ CallableWithPermitRelease(Callable<T> delegated) {
+ this.delegated = delegated;
+ }
+
+ @Override
+ public T call() throws Exception {
+ T result;
+ try {
+ result = this.delegated.call();
+ } finally {
+ SemaphoredDelegatingExecutor.this.queueingPermits.release();
+ }
+
+ return result;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
index 49fcfc8bd9..16bd73ee80 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
@@ -22,6 +22,7 @@ import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
@@ -35,28 +36,38 @@ public class ManifestReadThreadPool {
private static ThreadPoolExecutor executorService =
createCachedThreadPool(Runtime.getRuntime().availableProcessors(),
THREAD_NAME);
- public static synchronized ThreadPoolExecutor getExecutorService(@Nullable
Integer threadNum) {
- if (threadNum == null || threadNum <=
executorService.getMaximumPoolSize()) {
+ public static synchronized ExecutorService getExecutorService(@Nullable
Integer threadNum) {
+ if (threadNum == null || threadNum ==
executorService.getMaximumPoolSize()) {
return executorService;
}
- // we don't need to close previous pool
- // it is just cached pool
- executorService = createCachedThreadPool(threadNum, THREAD_NAME);
+ if (threadNum < executorService.getMaximumPoolSize()) {
+ return new SemaphoredDelegatingExecutor(executorService,
threadNum, false);
+ } else {
+ // we don't need to close previous pool
+ // it is just cached pool
+ executorService = createCachedThreadPool(threadNum, THREAD_NAME);
- return executorService;
+ return executorService;
+ }
}
/** This method aims to parallel process tasks with memory control and
sequentially. */
public static <T, U> Iterable<T> sequentialBatchedExecute(
Function<U, List<T>> processor, List<U> input, @Nullable Integer
threadNum) {
- ThreadPoolExecutor executor = getExecutorService(threadNum);
+ ExecutorService executor = getExecutorService(threadNum);
+ if (threadNum == null) {
+ threadNum =
+ executor instanceof ThreadPoolExecutor
+ ? ((ThreadPoolExecutor)
executor).getMaximumPoolSize()
+ : ((SemaphoredDelegatingExecutor)
executor).getPermitCount();
+ }
return ThreadPoolUtils.sequentialBatchedExecute(executor, processor,
input, threadNum);
}
/** This method aims to parallel process tasks with randomly but return
values sequentially. */
public static <T, U> Iterator<T> randomlyExecuteSequentialReturn(
Function<U, List<T>> processor, List<U> input, @Nullable Integer
threadNum) {
- ThreadPoolExecutor executor = getExecutorService(threadNum);
+ ExecutorService executor = getExecutorService(threadNum);
return ThreadPoolUtils.randomlyExecuteSequentialReturn(executor,
processor, input);
}
}