This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c11b95028 [core] Refactor ForkJoinPool to cached thread pool (#3914)
c11b95028 is described below

commit c11b95028e383051ce21a7413e710b33d8b81e6e
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Aug 7 13:14:43 2024 +0800

    [core] Refactor ForkJoinPool to cached thread pool (#3914)
---
 .../paimon/utils/FileDeletionThreadPool.java       |  23 +--
 .../org/apache/paimon/utils/ThreadPoolUtils.java   | 173 +++++++++++++++++++++
 .../java/org/apache/paimon/manifest/FileEntry.java |  12 +-
 .../paimon/operation/AbstractFileStoreScan.java    |  64 ++++----
 .../apache/paimon/operation/OrphanFilesClean.java  |  84 ++++------
 .../apache/paimon/table/sink/TableCommitImpl.java  |  32 ++--
 .../java/org/apache/paimon/utils/FileUtils.java    |  35 -----
 .../paimon/utils/ManifestReadThreadPool.java       |  38 ++---
 .../apache/paimon/utils/ScanParallelExecutor.java  | 101 ------------
 ...orTest.java => ManifestReadThreadPoolTest.java} |  48 ++----
 .../apache/paimon/hive/migrate/HiveMigrator.java   |  10 +-
 11 files changed, 296 insertions(+), 324 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
index 07b0f5d32..638d6f9a4 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
@@ -20,17 +20,17 @@ package org.apache.paimon.utils;
 
 import org.apache.paimon.fs.FileIO;
 
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
-import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
 
 /** Thread pool to delete files using {@link FileIO}. */
 public class FileDeletionThreadPool {
 
+    private static final String THREAD_NAME = "DELETE-FILE-THREAD-POOL";
+
     private static ThreadPoolExecutor executorService =
-            createCachedThreadPool(Runtime.getRuntime().availableProcessors());
+            createCachedThreadPool(Runtime.getRuntime().availableProcessors(), 
THREAD_NAME);
 
     public static synchronized ThreadPoolExecutor getExecutorService(int 
threadNum) {
         if (threadNum <= executorService.getMaximumPoolSize()) {
@@ -38,21 +38,8 @@ public class FileDeletionThreadPool {
         }
         // we don't need to close previous pool
         // it is just cached pool
-        executorService = createCachedThreadPool(threadNum);
+        executorService = createCachedThreadPool(threadNum, THREAD_NAME);
 
         return executorService;
     }
-
-    private static ThreadPoolExecutor createCachedThreadPool(int threadNum) {
-        ThreadPoolExecutor executor =
-                new ThreadPoolExecutor(
-                        threadNum,
-                        threadNum,
-                        1,
-                        TimeUnit.MINUTES,
-                        new LinkedBlockingQueue<>(),
-                        newDaemonThreadFactory("DELETE-FILE-THREAD-POOL"));
-        executor.allowCoreThreadTimeOut(true);
-        return executor;
-    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
new file mode 100644
index 000000000..6111b0e0f
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;
+
+/** Utils for thread pool. */
+public class ThreadPoolUtils {
+
+    /**
+     * Create a thread pool with max thread number. Inactive threads will 
automatically exit.
+     *
+     * <p>The {@link Executors#newCachedThreadPool} cannot limit max thread 
number. Non-core threads
+     * must be used with {@link SynchronousQueue}, but synchronous queue will 
be blocked when there
+     * is max thread number.
+     */
+    public static ThreadPoolExecutor createCachedThreadPool(int threadNum, 
String namePrefix) {
+        ThreadPoolExecutor executor =
+                new ThreadPoolExecutor(
+                        threadNum,
+                        threadNum,
+                        1,
+                        TimeUnit.MINUTES,
+                        new LinkedBlockingQueue<>(),
+                        newDaemonThreadFactory(namePrefix));
+        executor.allowCoreThreadTimeOut(true);
+        return executor;
+    }
+
+    /** This method aims to parallel process tasks with memory control and 
sequentially. */
+    public static <T, U> Iterable<T> sequentialBatchedExecute(
+            ThreadPoolExecutor executor,
+            Function<U, List<T>> processor,
+            List<U> input,
+            @Nullable Integer queueSize) {
+        if (queueSize == null) {
+            queueSize = executor.getMaximumPoolSize();
+        }
+        if (queueSize <= 0) {
+            throw new NegativeArraySizeException("queue size should not be 
negative");
+        }
+
+        final Queue<List<U>> stack = new ArrayDeque<>(Lists.partition(input, 
queueSize));
+        return () ->
+                new Iterator<T>() {
+                    Iterator<T> activeList = null;
+                    T next = null;
+
+                    @Override
+                    public boolean hasNext() {
+                        advanceIfNeeded();
+                        return next != null;
+                    }
+
+                    @Override
+                    public T next() {
+                        if (next == null) {
+                            throw new NoSuchElementException();
+                        }
+
+                        T result = next;
+                        next = null;
+                        return result;
+                    }
+
+                    private void advanceIfNeeded() {
+                        while (next == null) {
+                            if (activeList != null && activeList.hasNext()) {
+                                next = activeList.next();
+                            } else {
+                                if (stack.isEmpty()) {
+                                    return;
+                                }
+                                activeList = randomlyExecute(executor, 
processor, stack.poll());
+                            }
+                        }
+                    }
+                };
+    }
+
+    public static <U> void randomlyOnlyExecute(
+            ExecutorService executor, Consumer<U> processor, Collection<U> 
input) {
+        List<Future<?>> futures = new ArrayList<>(input.size());
+        for (U u : input) {
+            futures.add(executor.submit(() -> processor.accept(u)));
+        }
+        awaitAllFutures(futures);
+    }
+
+    public static <U, T> Iterator<T> randomlyExecute(
+            ExecutorService executor, Function<U, List<T>> processor, 
Collection<U> input) {
+        List<Future<List<T>>> futures = new ArrayList<>(input.size());
+        for (U u : input) {
+            futures.add(executor.submit(() -> processor.apply(u)));
+        }
+        return futuresToIterIter(futures);
+    }
+
+    private static <T> Iterator<T> futuresToIterIter(List<Future<List<T>>> 
futures) {
+        Queue<Future<List<T>>> queue = new ArrayDeque<>(futures);
+        return Iterators.concat(
+                new Iterator<Iterator<T>>() {
+                    @Override
+                    public boolean hasNext() {
+                        return !queue.isEmpty();
+                    }
+
+                    @Override
+                    public Iterator<T> next() {
+                        try {
+                            return queue.poll().get().iterator();
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException(e);
+                        } catch (ExecutionException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+    }
+
+    private static void awaitAllFutures(List<Future<?>> futures) {
+        for (Future<?> future : futures) {
+            try {
+                future.get();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 741d92296..3efc4ea19 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -21,7 +21,6 @@ package org.apache.paimon.manifest;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.ScanParallelExecutor;
 
 import javax.annotation.Nullable;
 
@@ -30,7 +29,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
 
 /** Entry representing a file. */
 public interface FileEntry {
@@ -156,12 +156,8 @@ public interface FileEntry {
             ManifestFile manifestFile,
             List<ManifestFileMeta> manifestFiles,
             @Nullable Integer manifestReadParallelism) {
-        return ScanParallelExecutor.parallelismBatchIterable(
-                files ->
-                        files.parallelStream()
-                                .flatMap(
-                                        m -> manifestFile.read(m.fileName(), 
m.fileSize()).stream())
-                                .collect(Collectors.toList()),
+        return sequentialBatchedExecute(
+                file -> manifestFile.read(file.fileName(), file.fileSize()),
                 manifestFiles,
                 manifestReadParallelism);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 5e6f914fe..2c753e270 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -41,7 +41,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.ScanParallelExecutor;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
@@ -54,14 +53,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkState;
+import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
 
 /** Default implementation of {@link FileStoreScan}. */
 public abstract class AbstractFileStoreScan implements FileStoreScan {
@@ -261,22 +262,13 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     public List<PartitionEntry> readPartitionEntries() {
         List<ManifestFileMeta> manifests = readManifests().getRight();
         Map<BinaryRow, PartitionEntry> partitions = new ConcurrentHashMap<>();
-        // Don't need to use parallelismBatchIterable here
         // Can be executed in disorder
-        ForkJoinPool executePool = 
ScanParallelExecutor.getExecutePool(scanManifestParallelism);
-        List<ForkJoinTask<?>> tasks = new ArrayList<>();
-        for (ManifestFileMeta manifest : manifests) {
-            ForkJoinTask<?> task =
-                    executePool.submit(
-                            () ->
-                                    PartitionEntry.merge(
-                                            
PartitionEntry.merge(readManifestFileMeta(manifest)),
-                                            partitions));
-            tasks.add(task);
-        }
-        for (ForkJoinTask<?> task : tasks) {
-            task.join();
-        }
+        ThreadPoolExecutor executor = 
getExecutorService(scanManifestParallelism);
+        Consumer<ManifestFileMeta> processor =
+                m ->
+                        PartitionEntry.merge(
+                                PartitionEntry.merge(readManifestFileMeta(m)), 
partitions);
+        randomlyOnlyExecute(executor, processor, manifests);
         return partitions.values().stream()
                 .filter(p -> p.fileCount() > 0)
                 .collect(Collectors.toList());
@@ -371,22 +363,24 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             List<ManifestFileMeta> manifests,
             Function<ManifestFileMeta, List<T>> manifestReader,
             @Nullable Filter<T> filterUnmergedEntry) {
-        Iterable<T> entries =
-                ScanParallelExecutor.parallelismBatchIterable(
-                        files -> {
-                            Stream<T> stream =
-                                    files.parallelStream()
-                                            
.filter(this::filterManifestFileMeta)
-                                            .flatMap(m -> 
manifestReader.apply(m).stream());
-                            if (filterUnmergedEntry != null) {
-                                stream = 
stream.filter(filterUnmergedEntry::test);
-                            }
-                            return stream.collect(Collectors.toList());
-                        },
-                        manifests,
-                        scanManifestParallelism);
-
-        return FileEntry.mergeEntries(entries);
+        // in memory filter, do it first
+        manifests =
+                manifests.stream()
+                        .filter(this::filterManifestFileMeta)
+                        .collect(Collectors.toList());
+        Function<ManifestFileMeta, List<T>> reader =
+                file -> {
+                    List<T> entries = manifestReader.apply(file);
+                    if (filterUnmergedEntry != null) {
+                        entries =
+                                entries.stream()
+                                        .filter(filterUnmergedEntry::test)
+                                        .collect(Collectors.toList());
+                    }
+                    return entries;
+                };
+        return FileEntry.mergeEntries(
+                sequentialBatchedExecute(reader, manifests, 
scanManifestParallelism));
     }
 
     private Pair<Snapshot, List<ManifestFileMeta>> readManifests() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index b0b06c33d..f0d49964c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -36,10 +36,12 @@ import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.DateTimeUtils;
-import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Sets;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +52,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -59,6 +63,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -67,6 +72,8 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
+import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
 
 /**
  * To remove the data files and metadata files that are not used by table 
(so-called "orphan
@@ -87,6 +94,10 @@ public class OrphanFilesClean {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(OrphanFilesClean.class);
 
+    private static final ThreadPoolExecutor EXECUTOR =
+            createCachedThreadPool(
+                    Runtime.getRuntime().availableProcessors(), 
"ORPHAN_FILES_CLEAN");
+
     private static final int READ_FILE_RETRY_NUM = 3;
     private static final int READ_FILE_RETRY_INTERVAL = 5;
     private static final int SHOW_LIMIT = 200;
@@ -177,25 +188,15 @@ public class OrphanFilesClean {
         List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
         readSnapshots.addAll(taggedSnapshots);
         readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs());
+        return Sets.newHashSet(randomlyExecute(EXECUTOR, this::getUsedFiles, 
readSnapshots));
+    }
 
-        return FileUtils.COMMON_IO_FORK_JOIN_POOL
-                .submit(
-                        () ->
-                                readSnapshots
-                                        .parallelStream()
-                                        .flatMap(
-                                                snapshot -> {
-                                                    if (snapshot instanceof 
Changelog) {
-                                                        return 
getUsedFilesForChangelog(
-                                                                (Changelog) 
snapshot)
-                                                                .stream();
-                                                    } else {
-                                                        return 
getUsedFilesForSnapshot(snapshot)
-                                                                .stream();
-                                                    }
-                                                })
-                                        .collect(Collectors.toSet()))
-                .get();
+    private List<String> getUsedFiles(Snapshot snapshot) {
+        if (snapshot instanceof Changelog) {
+            return getUsedFilesForChangelog((Changelog) snapshot);
+        } else {
+            return getUsedFilesForSnapshot(snapshot);
+        }
     }
 
     /**
@@ -204,22 +205,19 @@ public class OrphanFilesClean {
      */
     private Map<String, Path> getCandidateDeletingFiles() {
         List<Path> fileDirs = listPaimonFileDirs();
-        try {
-            return FileUtils.COMMON_IO_FORK_JOIN_POOL
-                    .submit(
-                            () ->
-                                    fileDirs.parallelStream()
-                                            .flatMap(p -> 
tryBestListingDirs(p).stream())
-                                            .filter(this::oldEnough)
-                                            .map(FileStatus::getPath)
-                                            .collect(
-                                                    Collectors.toMap(
-                                                            Path::getName, 
Function.identity())))
-                    .get();
-        } catch (ExecutionException | InterruptedException e) {
-            LOG.debug("Failed to get candidate deleting files.", e);
-            return Collections.emptyMap();
+        Function<Path, List<Path>> processor =
+                path ->
+                        tryBestListingDirs(path).stream()
+                                .filter(this::oldEnough)
+                                .map(FileStatus::getPath)
+                                .collect(Collectors.toList());
+        Iterator<Path> allPaths = randomlyExecute(EXECUTOR, processor, 
fileDirs);
+        Map<String, Path> result = new HashMap<>();
+        while (allPaths.hasNext()) {
+            Path next = allPaths.next();
+            result.put(next.getName(), next);
         }
+        return result;
     }
 
     private List<String> getUsedFilesForChangelog(Changelog changelog) {
@@ -496,22 +494,8 @@ public class OrphanFilesClean {
                         partitionKeysNum -> level != partitionKeysNum);
 
         // dive into the next partition level
-        try {
-            return FileUtils.COMMON_IO_FORK_JOIN_POOL
-                    .submit(
-                            () ->
-                                    partitionPaths
-                                            .parallelStream()
-                                            .flatMap(
-                                                    p ->
-                                                            
listAndCleanDataDirs(p, level - 1)
-                                                                    .stream())
-                                            .collect(Collectors.toList()))
-                    .get();
-        } catch (ExecutionException | InterruptedException e) {
-            LOG.debug("Failed to list partition directory {}", dir, e);
-            return Collections.emptyList();
-        }
+        return Lists.newArrayList(
+                randomlyExecute(EXECUTOR, p -> listAndCleanDataDirs(p, level - 
1), partitionPaths));
     }
 
     private List<Path> filterAndCleanDataDirs(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index c769aa4a3..5e25fcfd0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -33,11 +33,11 @@ import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.metrics.CommitMetrics;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
 import org.slf4j.Logger;
@@ -55,7 +55,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
@@ -63,9 +62,13 @@ import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
 import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
 import static org.apache.paimon.utils.Preconditions.checkState;
+import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
 
 /** An abstraction layer above {@link FileStoreCommit} to provide snapshot 
commit and expiration. */
 public class TableCommitImpl implements InnerTableCommit {
@@ -196,7 +199,7 @@ public class TableCommitImpl implements InnerTableCommit {
     }
 
     public void commit(ManifestCommittable committable) {
-        commitMultiple(Collections.singletonList(committable), false);
+        commitMultiple(singletonList(committable), false);
     }
 
     public void commitMultiple(List<ManifestCommittable> committables, boolean 
checkAppendFiles) {
@@ -285,22 +288,13 @@ public class TableCommitImpl implements InnerTableCommit {
                         throw new UncheckedIOException(e);
                     }
                 };
-        List<Path> nonExistFiles;
-        try {
-            nonExistFiles =
-                    FileUtils.COMMON_IO_FORK_JOIN_POOL
-                            .submit(
-                                    () ->
-                                            files.parallelStream()
-                                                    .filter(nonExists)
-                                                    
.collect(Collectors.toList()))
-                            .get();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException(e.getCause());
-        }
+
+        List<Path> nonExistFiles =
+                Lists.newArrayList(
+                        randomlyExecute(
+                                getExecutorService(null),
+                                f -> nonExists.test(f) ? singletonList(f) : 
emptyList(),
+                                files));
 
         if (nonExistFiles.size() > 0) {
             String message =
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
index 17823f34d..d714487f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
@@ -31,46 +31,11 @@ import javax.annotation.Nullable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.stream.Stream;
 
 /** Utils for file reading and writing. */
 public class FileUtils {
 
-    public static final ForkJoinPool COMMON_IO_FORK_JOIN_POOL;
-
-    private static volatile ForkJoinPool scanIoForkJoinPool;
-
-    static {
-        COMMON_IO_FORK_JOIN_POOL =
-                createForkJoinPool(
-                        "file-store-common-io", 
Runtime.getRuntime().availableProcessors());
-    }
-
-    private static ForkJoinPool createForkJoinPool(String namePrefix, int 
parallelism) {
-        // if we want to name threads in the fork join pool we need all these
-        // see https://stackoverflow.com/questions/34303094/
-        ForkJoinPool.ForkJoinWorkerThreadFactory factory =
-                pool -> {
-                    ForkJoinWorkerThread worker =
-                            
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
-                    worker.setName(namePrefix + "-" + worker.getPoolIndex());
-                    return worker;
-                };
-        return new ForkJoinPool(parallelism, factory, null, false);
-    }
-
-    public static synchronized ForkJoinPool getScanIoForkJoinPool(int 
parallelism) {
-        if (scanIoForkJoinPool == null || parallelism > 
scanIoForkJoinPool.getParallelism()) {
-            if (scanIoForkJoinPool != null) {
-                scanIoForkJoinPool.shutdown();
-            }
-            scanIoForkJoinPool = createForkJoinPool("paimon-scan-io", 
parallelism);
-        }
-        return scanIoForkJoinPool;
-    }
-
     /**
      * List versioned files for the directory.
      *
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
 b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
similarity index 56%
copy from 
paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
copy to 
paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
index 07b0f5d32..74775f488 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
@@ -18,41 +18,37 @@
 
 package org.apache.paimon.utils;
 
-import org.apache.paimon.fs.FileIO;
+import javax.annotation.Nullable;
 
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.List;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
-import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
 
-/** Thread pool to delete files using {@link FileIO}. */
-public class FileDeletionThreadPool {
+/** Thread pool to read manifests. */
+public class ManifestReadThreadPool {
+
+    private static final String THREAD_NAME = "MANIFEST-READ-THREAD-POOL";
 
     private static ThreadPoolExecutor executorService =
-            createCachedThreadPool(Runtime.getRuntime().availableProcessors());
+            createCachedThreadPool(Runtime.getRuntime().availableProcessors(), 
THREAD_NAME);
 
-    public static synchronized ThreadPoolExecutor getExecutorService(int 
threadNum) {
-        if (threadNum <= executorService.getMaximumPoolSize()) {
+    public static synchronized ThreadPoolExecutor getExecutorService(@Nullable 
Integer threadNum) {
+        if (threadNum == null || threadNum <= 
executorService.getMaximumPoolSize()) {
             return executorService;
         }
         // we don't need to close previous pool
         // it is just cached pool
-        executorService = createCachedThreadPool(threadNum);
+        executorService = createCachedThreadPool(threadNum, THREAD_NAME);
 
         return executorService;
     }
 
-    private static ThreadPoolExecutor createCachedThreadPool(int threadNum) {
-        ThreadPoolExecutor executor =
-                new ThreadPoolExecutor(
-                        threadNum,
-                        threadNum,
-                        1,
-                        TimeUnit.MINUTES,
-                        new LinkedBlockingQueue<>(),
-                        newDaemonThreadFactory("DELETE-FILE-THREAD-POOL"));
-        executor.allowCoreThreadTimeOut(true);
-        return executor;
+    /** This method aims to parallel process tasks with memory control and 
sequentially. */
+    public static <T, U> Iterable<T> sequentialBatchedExecute(
+            Function<U, List<T>> processor, List<U> input, @Nullable Integer 
threadNum) {
+        ThreadPoolExecutor executor = getExecutorService(threadNum);
+        return ThreadPoolUtils.sequentialBatchedExecute(executor, processor, 
input, threadNum);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
deleted file mode 100644
index 6bf6b3517..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.utils;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ForkJoinPool;
-import java.util.function.Function;
-
-import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
-
-/**
- * This class is a parallel execution util class, which mainly aim to process 
tasks parallelly with
- * memory control.
- */
-public class ScanParallelExecutor {
-
-    // reduce memory usage by batch iterable process, the cached result in 
memory will be queueSize
-    public static <T, U> Iterable<T> parallelismBatchIterable(
-            Function<List<U>, List<T>> processor, List<U> input, @Nullable 
Integer queueSize) {
-        if (queueSize == null) {
-            queueSize = COMMON_IO_FORK_JOIN_POOL.getParallelism();
-        } else if (queueSize <= 0) {
-            throw new NegativeArraySizeException("queue size should not be 
negative");
-        }
-
-        final Queue<List<U>> stack = new ArrayDeque<>(Lists.partition(input, 
queueSize));
-        final int settledQueueSize = queueSize;
-        return () ->
-                new Iterator<T>() {
-                    List<T> activeList = null;
-                    private int index = 0;
-
-                    @Override
-                    public boolean hasNext() {
-                        advanceIfNeeded();
-                        return activeList != null && index < activeList.size();
-                    }
-
-                    @Override
-                    public T next() {
-                        advanceIfNeeded();
-                        if (activeList == null || index >= activeList.size()) {
-                            throw new NoSuchElementException();
-                        }
-                        return activeList.get(index++);
-                    }
-
-                    private void advanceIfNeeded() {
-                        while ((activeList == null || index >= 
activeList.size())
-                                && !stack.isEmpty()) {
-                            // reset index
-                            index = 0;
-                            try {
-                                activeList =
-                                        CompletableFuture.supplyAsync(
-                                                        () -> 
processor.apply(stack.poll()),
-                                                        
getExecutePool(settledQueueSize))
-                                                .get();
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        }
-                    }
-                };
-    }
-
-    public static ForkJoinPool getExecutePool(@Nullable Integer queueSize) {
-        if (queueSize == null) {
-            return COMMON_IO_FORK_JOIN_POOL;
-        }
-
-        return queueSize > COMMON_IO_FORK_JOIN_POOL.getParallelism()
-                ? FileUtils.getScanIoForkJoinPool(queueSize)
-                : COMMON_IO_FORK_JOIN_POOL;
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ScanParallelExecutorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/ManifestReadThreadPoolTest.java
similarity index 68%
rename from 
paimon-core/src/test/java/org/apache/paimon/utils/ScanParallelExecutorTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/utils/ManifestReadThreadPoolTest.java
index e312a418e..8c131bdc5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/ScanParallelExecutorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/ManifestReadThreadPoolTest.java
@@ -27,10 +27,13 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
-/** This test mainly test for the methods in {@link ScanParallelExecutor}. */
-public class ScanParallelExecutorTest {
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
+
+/** This test mainly test for the methods in {@link ManifestReadThreadPool}. */
+public class ManifestReadThreadPoolTest {
 
     @Test
     public void testParallelismBatchIterable() {
@@ -40,11 +43,7 @@ public class ScanParallelExecutorTest {
             nums.add(i);
         }
 
-        Iterable<Integer> re =
-                ScanParallelExecutor.parallelismBatchIterable(
-                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
-                        nums,
-                        null);
+        Iterable<Integer> re = sequentialBatchedExecute(i -> singletonList(i + 
1), nums, null);
 
         AtomicInteger atomicInteger = new AtomicInteger(0);
         re.forEach(
@@ -61,11 +60,7 @@ public class ScanParallelExecutorTest {
             nums.add(i);
         }
 
-        Iterable<Integer> re =
-                ScanParallelExecutor.parallelismBatchIterable(
-                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
-                        nums,
-                        null);
+        Iterable<Integer> re = sequentialBatchedExecute(i -> singletonList(i + 
1), nums, null);
 
         AtomicInteger atomicInteger = new AtomicInteger(0);
         re.forEach(
@@ -82,11 +77,7 @@ public class ScanParallelExecutorTest {
             nums.add(i);
         }
 
-        Iterable<Integer> re =
-                ScanParallelExecutor.parallelismBatchIterable(
-                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
-                        nums,
-                        null);
+        Iterable<Integer> re = sequentialBatchedExecute(i -> singletonList(i + 
1), nums, null);
 
         Iterator<Integer> iterator = re.iterator();
         for (int i = 0; i < 100; i++) {
@@ -108,11 +99,7 @@ public class ScanParallelExecutorTest {
             nums.add(i);
         }
 
-        Iterable<Integer> re =
-                ScanParallelExecutor.parallelismBatchIterable(
-                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
-                        nums,
-                        null);
+        Iterable<Integer> re = sequentialBatchedExecute(i -> singletonList(i + 
1), nums, null);
 
         Iterator<Integer> iterator = re.iterator();
         for (int i = 0; i < 123; i++) {
@@ -129,20 +116,15 @@ public class ScanParallelExecutorTest {
     @Test
     public void testForEmptyInput() {
         Iterable<Integer> re =
-                ScanParallelExecutor.parallelismBatchIterable(
-                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
-                        (List<Integer>) Collections.EMPTY_LIST,
-                        null);
+                sequentialBatchedExecute(
+                        i -> singletonList(i + 1), (List<Integer>) 
Collections.EMPTY_LIST, null);
         Assertions.assertThat(!re.iterator().hasNext()).isTrue();
     }
 
     @Test
     public void testForSingletonInput() {
         Iterable<Integer> re =
-                ScanParallelExecutor.parallelismBatchIterable(
-                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
-                        Collections.singletonList(1),
-                        null);
+                sequentialBatchedExecute(i -> singletonList(i + 1), 
singletonList(1), null);
         re.forEach(i -> Assertions.assertThat(i).isEqualTo(2));
     }
 
@@ -150,8 +132,8 @@ public class ScanParallelExecutorTest {
     public void testDifferentQueueSizeWithFilterElement() {
         for (int queueSize = 1; queueSize < 20; queueSize++) {
             Iterable<Integer> re =
-                    ScanParallelExecutor.parallelismBatchIterable(
-                            l -> l.parallelStream().filter(i -> i > 
5).collect(Collectors.toList()),
+                    sequentialBatchedExecute(
+                            i -> i > 5 ? singletonList(i) : emptyList(),
                             Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
                             queueSize);
             Integer[] result = new Integer[] {6, 7, 8, 9, 10};
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index 01c2704de..379c61f70 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -54,18 +54,22 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
-import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
 
 /** Migrate hive table to paimon table. */
 public class HiveMigrator implements Migrator {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveMigrator.class);
 
+    private static final ThreadPoolExecutor EXECUTOR =
+            createCachedThreadPool(Runtime.getRuntime().availableProcessors(), 
"HIVE_MIGRATOR");
+
     private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
             p -> !p.getPath().getName().startsWith("_") && 
!p.getPath().getName().startsWith(".");
 
@@ -171,9 +175,7 @@ public class HiveMigrator implements Migrator {
             }
 
             List<Future<CommitMessage>> futures =
-                    tasks.stream()
-                            .map(COMMON_IO_FORK_JOIN_POOL::submit)
-                            .collect(Collectors.toList());
+                    
tasks.stream().map(EXECUTOR::submit).collect(Collectors.toList());
             List<CommitMessage> commitMessages = new ArrayList<>();
             try {
                 for (Future<CommitMessage> future : futures) {


Reply via email to