This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c11b95028 [core] Refactor ForkJoinPool to cached thread pool (#3914)
c11b95028 is described below
commit c11b95028e383051ce21a7413e710b33d8b81e6e
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Aug 7 13:14:43 2024 +0800
[core] Refactor ForkJoinPool to cached thread pool (#3914)
---
.../paimon/utils/FileDeletionThreadPool.java | 23 +--
.../org/apache/paimon/utils/ThreadPoolUtils.java | 173 +++++++++++++++++++++
.../java/org/apache/paimon/manifest/FileEntry.java | 12 +-
.../paimon/operation/AbstractFileStoreScan.java | 64 ++++----
.../apache/paimon/operation/OrphanFilesClean.java | 84 ++++------
.../apache/paimon/table/sink/TableCommitImpl.java | 32 ++--
.../java/org/apache/paimon/utils/FileUtils.java | 35 -----
.../paimon/utils/ManifestReadThreadPool.java | 38 ++---
.../apache/paimon/utils/ScanParallelExecutor.java | 101 ------------
...orTest.java => ManifestReadThreadPoolTest.java} | 48 ++----
.../apache/paimon/hive/migrate/HiveMigrator.java | 10 +-
11 files changed, 296 insertions(+), 324 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
b/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
index 07b0f5d32..638d6f9a4 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
@@ -20,17 +20,17 @@ package org.apache.paimon.utils;
import org.apache.paimon.fs.FileIO;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
/** Thread pool to delete files using {@link FileIO}. */
public class FileDeletionThreadPool {
+ private static final String THREAD_NAME = "DELETE-FILE-THREAD-POOL";
+
private static ThreadPoolExecutor executorService =
- createCachedThreadPool(Runtime.getRuntime().availableProcessors());
+ createCachedThreadPool(Runtime.getRuntime().availableProcessors(),
THREAD_NAME);
public static synchronized ThreadPoolExecutor getExecutorService(int
threadNum) {
if (threadNum <= executorService.getMaximumPoolSize()) {
@@ -38,21 +38,8 @@ public class FileDeletionThreadPool {
}
// we don't need to close previous pool
// it is just cached pool
- executorService = createCachedThreadPool(threadNum);
+ executorService = createCachedThreadPool(threadNum, THREAD_NAME);
return executorService;
}
-
- private static ThreadPoolExecutor createCachedThreadPool(int threadNum) {
- ThreadPoolExecutor executor =
- new ThreadPoolExecutor(
- threadNum,
- threadNum,
- 1,
- TimeUnit.MINUTES,
- new LinkedBlockingQueue<>(),
- newDaemonThreadFactory("DELETE-FILE-THREAD-POOL"));
- executor.allowCoreThreadTimeOut(true);
- return executor;
- }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
new file mode 100644
index 000000000..6111b0e0f
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;
+
+/** Utils for thread pool. */
+public class ThreadPoolUtils {
+
+ /**
+ * Create a thread pool with max thread number. Inactive threads will
automatically exit.
+ *
+ * <p>The {@link Executors#newCachedThreadPool} cannot limit max thread
number. Non-core threads
+ * must be used with {@link SynchronousQueue}, but synchronous queue will
be blocked when there
+ * is max thread number.
+ */
+ public static ThreadPoolExecutor createCachedThreadPool(int threadNum,
String namePrefix) {
+ ThreadPoolExecutor executor =
+ new ThreadPoolExecutor(
+ threadNum,
+ threadNum,
+ 1,
+ TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(),
+ newDaemonThreadFactory(namePrefix));
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
+ }
+
+ /** This method aims to parallel process tasks with memory control and
sequentially. */
+ public static <T, U> Iterable<T> sequentialBatchedExecute(
+ ThreadPoolExecutor executor,
+ Function<U, List<T>> processor,
+ List<U> input,
+ @Nullable Integer queueSize) {
+ if (queueSize == null) {
+ queueSize = executor.getMaximumPoolSize();
+ }
+ if (queueSize <= 0) {
+ throw new NegativeArraySizeException("queue size should not be
negative");
+ }
+
+ final Queue<List<U>> stack = new ArrayDeque<>(Lists.partition(input,
queueSize));
+ return () ->
+ new Iterator<T>() {
+ Iterator<T> activeList = null;
+ T next = null;
+
+ @Override
+ public boolean hasNext() {
+ advanceIfNeeded();
+ return next != null;
+ }
+
+ @Override
+ public T next() {
+ if (next == null) {
+ throw new NoSuchElementException();
+ }
+
+ T result = next;
+ next = null;
+ return result;
+ }
+
+ private void advanceIfNeeded() {
+ while (next == null) {
+ if (activeList != null && activeList.hasNext()) {
+ next = activeList.next();
+ } else {
+ if (stack.isEmpty()) {
+ return;
+ }
+ activeList = randomlyExecute(executor,
processor, stack.poll());
+ }
+ }
+ }
+ };
+ }
+
+ public static <U> void randomlyOnlyExecute(
+ ExecutorService executor, Consumer<U> processor, Collection<U>
input) {
+ List<Future<?>> futures = new ArrayList<>(input.size());
+ for (U u : input) {
+ futures.add(executor.submit(() -> processor.accept(u)));
+ }
+ awaitAllFutures(futures);
+ }
+
+ public static <U, T> Iterator<T> randomlyExecute(
+ ExecutorService executor, Function<U, List<T>> processor,
Collection<U> input) {
+ List<Future<List<T>>> futures = new ArrayList<>(input.size());
+ for (U u : input) {
+ futures.add(executor.submit(() -> processor.apply(u)));
+ }
+ return futuresToIterIter(futures);
+ }
+
+ private static <T> Iterator<T> futuresToIterIter(List<Future<List<T>>>
futures) {
+ Queue<Future<List<T>>> queue = new ArrayDeque<>(futures);
+ return Iterators.concat(
+ new Iterator<Iterator<T>>() {
+ @Override
+ public boolean hasNext() {
+ return !queue.isEmpty();
+ }
+
+ @Override
+ public Iterator<T> next() {
+ try {
+ return queue.poll().get().iterator();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ private static void awaitAllFutures(List<Future<?>> futures) {
+ for (Future<?> future : futures) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 741d92296..3efc4ea19 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -21,7 +21,6 @@ package org.apache.paimon.manifest;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.ScanParallelExecutor;
import javax.annotation.Nullable;
@@ -30,7 +29,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.stream.Collectors;
+
+import static
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
/** Entry representing a file. */
public interface FileEntry {
@@ -156,12 +156,8 @@ public interface FileEntry {
ManifestFile manifestFile,
List<ManifestFileMeta> manifestFiles,
@Nullable Integer manifestReadParallelism) {
- return ScanParallelExecutor.parallelismBatchIterable(
- files ->
- files.parallelStream()
- .flatMap(
- m -> manifestFile.read(m.fileName(),
m.fileSize()).stream())
- .collect(Collectors.toList()),
+ return sequentialBatchedExecute(
+ file -> manifestFile.read(file.fileName(), file.fileSize()),
manifestFiles,
manifestReadParallelism);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 5e6f914fe..2c753e270 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -41,7 +41,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.ScanParallelExecutor;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -54,14 +53,16 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
+import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
/** Default implementation of {@link FileStoreScan}. */
public abstract class AbstractFileStoreScan implements FileStoreScan {
@@ -261,22 +262,13 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
public List<PartitionEntry> readPartitionEntries() {
List<ManifestFileMeta> manifests = readManifests().getRight();
Map<BinaryRow, PartitionEntry> partitions = new ConcurrentHashMap<>();
- // Don't need to use parallelismBatchIterable here
// Can be executed in disorder
- ForkJoinPool executePool =
ScanParallelExecutor.getExecutePool(scanManifestParallelism);
- List<ForkJoinTask<?>> tasks = new ArrayList<>();
- for (ManifestFileMeta manifest : manifests) {
- ForkJoinTask<?> task =
- executePool.submit(
- () ->
- PartitionEntry.merge(
-
PartitionEntry.merge(readManifestFileMeta(manifest)),
- partitions));
- tasks.add(task);
- }
- for (ForkJoinTask<?> task : tasks) {
- task.join();
- }
+ ThreadPoolExecutor executor =
getExecutorService(scanManifestParallelism);
+ Consumer<ManifestFileMeta> processor =
+ m ->
+ PartitionEntry.merge(
+ PartitionEntry.merge(readManifestFileMeta(m)),
partitions);
+ randomlyOnlyExecute(executor, processor, manifests);
return partitions.values().stream()
.filter(p -> p.fileCount() > 0)
.collect(Collectors.toList());
@@ -371,22 +363,24 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
List<ManifestFileMeta> manifests,
Function<ManifestFileMeta, List<T>> manifestReader,
@Nullable Filter<T> filterUnmergedEntry) {
- Iterable<T> entries =
- ScanParallelExecutor.parallelismBatchIterable(
- files -> {
- Stream<T> stream =
- files.parallelStream()
-
.filter(this::filterManifestFileMeta)
- .flatMap(m ->
manifestReader.apply(m).stream());
- if (filterUnmergedEntry != null) {
- stream =
stream.filter(filterUnmergedEntry::test);
- }
- return stream.collect(Collectors.toList());
- },
- manifests,
- scanManifestParallelism);
-
- return FileEntry.mergeEntries(entries);
+ // in memory filter, do it first
+ manifests =
+ manifests.stream()
+ .filter(this::filterManifestFileMeta)
+ .collect(Collectors.toList());
+ Function<ManifestFileMeta, List<T>> reader =
+ file -> {
+ List<T> entries = manifestReader.apply(file);
+ if (filterUnmergedEntry != null) {
+ entries =
+ entries.stream()
+ .filter(filterUnmergedEntry::test)
+ .collect(Collectors.toList());
+ }
+ return entries;
+ };
+ return FileEntry.mergeEntries(
+ sequentialBatchedExecute(reader, manifests,
scanManifestParallelism));
}
private Pair<Snapshot, List<ManifestFileMeta>> readManifests() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index b0b06c33d..f0d49964c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -36,10 +36,12 @@ import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.DateTimeUtils;
-import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Sets;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +52,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -59,6 +63,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -67,6 +72,8 @@ import java.util.stream.Collectors;
import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
+import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
/**
* To remove the data files and metadata files that are not used by table
(so-called "orphan
@@ -87,6 +94,10 @@ public class OrphanFilesClean {
private static final Logger LOG =
LoggerFactory.getLogger(OrphanFilesClean.class);
+ private static final ThreadPoolExecutor EXECUTOR =
+ createCachedThreadPool(
+ Runtime.getRuntime().availableProcessors(),
"ORPHAN_FILES_CLEAN");
+
private static final int READ_FILE_RETRY_NUM = 3;
private static final int READ_FILE_RETRY_INTERVAL = 5;
private static final int SHOW_LIMIT = 200;
@@ -177,25 +188,15 @@ public class OrphanFilesClean {
List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
readSnapshots.addAll(taggedSnapshots);
readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs());
+ return Sets.newHashSet(randomlyExecute(EXECUTOR, this::getUsedFiles,
readSnapshots));
+ }
- return FileUtils.COMMON_IO_FORK_JOIN_POOL
- .submit(
- () ->
- readSnapshots
- .parallelStream()
- .flatMap(
- snapshot -> {
- if (snapshot instanceof
Changelog) {
- return
getUsedFilesForChangelog(
- (Changelog)
snapshot)
- .stream();
- } else {
- return
getUsedFilesForSnapshot(snapshot)
- .stream();
- }
- })
- .collect(Collectors.toSet()))
- .get();
+ private List<String> getUsedFiles(Snapshot snapshot) {
+ if (snapshot instanceof Changelog) {
+ return getUsedFilesForChangelog((Changelog) snapshot);
+ } else {
+ return getUsedFilesForSnapshot(snapshot);
+ }
}
/**
@@ -204,22 +205,19 @@ public class OrphanFilesClean {
*/
private Map<String, Path> getCandidateDeletingFiles() {
List<Path> fileDirs = listPaimonFileDirs();
- try {
- return FileUtils.COMMON_IO_FORK_JOIN_POOL
- .submit(
- () ->
- fileDirs.parallelStream()
- .flatMap(p ->
tryBestListingDirs(p).stream())
- .filter(this::oldEnough)
- .map(FileStatus::getPath)
- .collect(
- Collectors.toMap(
- Path::getName,
Function.identity())))
- .get();
- } catch (ExecutionException | InterruptedException e) {
- LOG.debug("Failed to get candidate deleting files.", e);
- return Collections.emptyMap();
+ Function<Path, List<Path>> processor =
+ path ->
+ tryBestListingDirs(path).stream()
+ .filter(this::oldEnough)
+ .map(FileStatus::getPath)
+ .collect(Collectors.toList());
+ Iterator<Path> allPaths = randomlyExecute(EXECUTOR, processor,
fileDirs);
+ Map<String, Path> result = new HashMap<>();
+ while (allPaths.hasNext()) {
+ Path next = allPaths.next();
+ result.put(next.getName(), next);
}
+ return result;
}
private List<String> getUsedFilesForChangelog(Changelog changelog) {
@@ -496,22 +494,8 @@ public class OrphanFilesClean {
partitionKeysNum -> level != partitionKeysNum);
// dive into the next partition level
- try {
- return FileUtils.COMMON_IO_FORK_JOIN_POOL
- .submit(
- () ->
- partitionPaths
- .parallelStream()
- .flatMap(
- p ->
-
listAndCleanDataDirs(p, level - 1)
- .stream())
- .collect(Collectors.toList()))
- .get();
- } catch (ExecutionException | InterruptedException e) {
- LOG.debug("Failed to list partition directory {}", dir, e);
- return Collections.emptyList();
- }
+ return Lists.newArrayList(
+ randomlyExecute(EXECUTOR, p -> listAndCleanDataDirs(p, level -
1), partitionPaths));
}
private List<Path> filterAndCleanDataDirs(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index c769aa4a3..5e25fcfd0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -33,11 +33,11 @@ import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
@@ -55,7 +55,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
@@ -63,9 +62,13 @@ import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
import static org.apache.paimon.utils.Preconditions.checkState;
+import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
/** An abstraction layer above {@link FileStoreCommit} to provide snapshot
commit and expiration. */
public class TableCommitImpl implements InnerTableCommit {
@@ -196,7 +199,7 @@ public class TableCommitImpl implements InnerTableCommit {
}
public void commit(ManifestCommittable committable) {
- commitMultiple(Collections.singletonList(committable), false);
+ commitMultiple(singletonList(committable), false);
}
public void commitMultiple(List<ManifestCommittable> committables, boolean
checkAppendFiles) {
@@ -285,22 +288,13 @@ public class TableCommitImpl implements InnerTableCommit {
throw new UncheckedIOException(e);
}
};
- List<Path> nonExistFiles;
- try {
- nonExistFiles =
- FileUtils.COMMON_IO_FORK_JOIN_POOL
- .submit(
- () ->
- files.parallelStream()
- .filter(nonExists)
-
.collect(Collectors.toList()))
- .get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e.getCause());
- }
+
+ List<Path> nonExistFiles =
+ Lists.newArrayList(
+ randomlyExecute(
+ getExecutorService(null),
+ f -> nonExists.test(f) ? singletonList(f) :
emptyList(),
+ files));
if (nonExistFiles.size() > 0) {
String message =
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
index 17823f34d..d714487f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
@@ -31,46 +31,11 @@ import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinWorkerThread;
import java.util.stream.Stream;
/** Utils for file reading and writing. */
public class FileUtils {
- public static final ForkJoinPool COMMON_IO_FORK_JOIN_POOL;
-
- private static volatile ForkJoinPool scanIoForkJoinPool;
-
- static {
- COMMON_IO_FORK_JOIN_POOL =
- createForkJoinPool(
- "file-store-common-io",
Runtime.getRuntime().availableProcessors());
- }
-
- private static ForkJoinPool createForkJoinPool(String namePrefix, int
parallelism) {
- // if we want to name threads in the fork join pool we need all these
- // see https://stackoverflow.com/questions/34303094/
- ForkJoinPool.ForkJoinWorkerThreadFactory factory =
- pool -> {
- ForkJoinWorkerThread worker =
-
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
- worker.setName(namePrefix + "-" + worker.getPoolIndex());
- return worker;
- };
- return new ForkJoinPool(parallelism, factory, null, false);
- }
-
- public static synchronized ForkJoinPool getScanIoForkJoinPool(int
parallelism) {
- if (scanIoForkJoinPool == null || parallelism >
scanIoForkJoinPool.getParallelism()) {
- if (scanIoForkJoinPool != null) {
- scanIoForkJoinPool.shutdown();
- }
- scanIoForkJoinPool = createForkJoinPool("paimon-scan-io",
parallelism);
- }
- return scanIoForkJoinPool;
- }
-
/**
* List versioned files for the directory.
*
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
similarity index 56%
copy from
paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
copy to
paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
index 07b0f5d32..74775f488 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
@@ -18,41 +18,37 @@
package org.apache.paimon.utils;
-import org.apache.paimon.fs.FileIO;
+import javax.annotation.Nullable;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
-import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
-/** Thread pool to delete files using {@link FileIO}. */
-public class FileDeletionThreadPool {
+/** Thread pool to read manifests. */
+public class ManifestReadThreadPool {
+
+ private static final String THREAD_NAME = "MANIFEST-READ-THREAD-POOL";
private static ThreadPoolExecutor executorService =
- createCachedThreadPool(Runtime.getRuntime().availableProcessors());
+ createCachedThreadPool(Runtime.getRuntime().availableProcessors(),
THREAD_NAME);
- public static synchronized ThreadPoolExecutor getExecutorService(int
threadNum) {
- if (threadNum <= executorService.getMaximumPoolSize()) {
+ public static synchronized ThreadPoolExecutor getExecutorService(@Nullable
Integer threadNum) {
+ if (threadNum == null || threadNum <=
executorService.getMaximumPoolSize()) {
return executorService;
}
// we don't need to close previous pool
// it is just cached pool
- executorService = createCachedThreadPool(threadNum);
+ executorService = createCachedThreadPool(threadNum, THREAD_NAME);
return executorService;
}
- private static ThreadPoolExecutor createCachedThreadPool(int threadNum) {
- ThreadPoolExecutor executor =
- new ThreadPoolExecutor(
- threadNum,
- threadNum,
- 1,
- TimeUnit.MINUTES,
- new LinkedBlockingQueue<>(),
- newDaemonThreadFactory("DELETE-FILE-THREAD-POOL"));
- executor.allowCoreThreadTimeOut(true);
- return executor;
+ /** This method aims to parallel process tasks with memory control and
sequentially. */
+ public static <T, U> Iterable<T> sequentialBatchedExecute(
+ Function<U, List<T>> processor, List<U> input, @Nullable Integer
threadNum) {
+ ThreadPoolExecutor executor = getExecutorService(threadNum);
+ return ThreadPoolUtils.sequentialBatchedExecute(executor, processor,
input, threadNum);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
deleted file mode 100644
index 6bf6b3517..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.utils;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ForkJoinPool;
-import java.util.function.Function;
-
-import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
-
-/**
- * This class is a parallel execution util class, which mainly aim to process
tasks parallelly with
- * memory control.
- */
-public class ScanParallelExecutor {
-
- // reduce memory usage by batch iterable process, the cached result in
memory will be queueSize
- public static <T, U> Iterable<T> parallelismBatchIterable(
- Function<List<U>, List<T>> processor, List<U> input, @Nullable
Integer queueSize) {
- if (queueSize == null) {
- queueSize = COMMON_IO_FORK_JOIN_POOL.getParallelism();
- } else if (queueSize <= 0) {
- throw new NegativeArraySizeException("queue size should not be
negative");
- }
-
- final Queue<List<U>> stack = new ArrayDeque<>(Lists.partition(input,
queueSize));
- final int settledQueueSize = queueSize;
- return () ->
- new Iterator<T>() {
- List<T> activeList = null;
- private int index = 0;
-
- @Override
- public boolean hasNext() {
- advanceIfNeeded();
- return activeList != null && index < activeList.size();
- }
-
- @Override
- public T next() {
- advanceIfNeeded();
- if (activeList == null || index >= activeList.size()) {
- throw new NoSuchElementException();
- }
- return activeList.get(index++);
- }
-
- private void advanceIfNeeded() {
- while ((activeList == null || index >=
activeList.size())
- && !stack.isEmpty()) {
- // reset index
- index = 0;
- try {
- activeList =
- CompletableFuture.supplyAsync(
- () ->
processor.apply(stack.poll()),
-
getExecutePool(settledQueueSize))
- .get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- };
- }
-
- public static ForkJoinPool getExecutePool(@Nullable Integer queueSize) {
- if (queueSize == null) {
- return COMMON_IO_FORK_JOIN_POOL;
- }
-
- return queueSize > COMMON_IO_FORK_JOIN_POOL.getParallelism()
- ? FileUtils.getScanIoForkJoinPool(queueSize)
- : COMMON_IO_FORK_JOIN_POOL;
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ScanParallelExecutorTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ManifestReadThreadPoolTest.java
similarity index 68%
rename from
paimon-core/src/test/java/org/apache/paimon/utils/ScanParallelExecutorTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/utils/ManifestReadThreadPoolTest.java
index e312a418e..8c131bdc5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/ScanParallelExecutorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/ManifestReadThreadPoolTest.java
@@ -27,10 +27,13 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-/** This test mainly test for the methods in {@link ScanParallelExecutor}. */
-public class ScanParallelExecutorTest {
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
+
+/** This test mainly test for the methods in {@link ManifestReadThreadPool}. */
+public class ManifestReadThreadPoolTest {
@Test
public void testParallelismBatchIterable() {
@@ -40,11 +43,7 @@ public class ScanParallelExecutorTest {
nums.add(i);
}
- Iterable<Integer> re =
- ScanParallelExecutor.parallelismBatchIterable(
- l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
- nums,
- null);
+ Iterable<Integer> re = sequentialBatchedExecute(i -> singletonList(i +
1), nums, null);
AtomicInteger atomicInteger = new AtomicInteger(0);
re.forEach(
@@ -61,11 +60,7 @@ public class ScanParallelExecutorTest {
nums.add(i);
}
- Iterable<Integer> re =
- ScanParallelExecutor.parallelismBatchIterable(
- l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
- nums,
- null);
+ Iterable<Integer> re = sequentialBatchedExecute(i -> singletonList(i +
1), nums, null);
AtomicInteger atomicInteger = new AtomicInteger(0);
re.forEach(
@@ -82,11 +77,7 @@ public class ScanParallelExecutorTest {
nums.add(i);
}
- Iterable<Integer> re =
- ScanParallelExecutor.parallelismBatchIterable(
- l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
- nums,
- null);
+ Iterable<Integer> re = sequentialBatchedExecute(i -> singletonList(i +
1), nums, null);
Iterator<Integer> iterator = re.iterator();
for (int i = 0; i < 100; i++) {
@@ -108,11 +99,7 @@ public class ScanParallelExecutorTest {
nums.add(i);
}
- Iterable<Integer> re =
- ScanParallelExecutor.parallelismBatchIterable(
- l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
- nums,
- null);
+ Iterable<Integer> re = sequentialBatchedExecute(i -> singletonList(i +
1), nums, null);
Iterator<Integer> iterator = re.iterator();
for (int i = 0; i < 123; i++) {
@@ -129,20 +116,15 @@ public class ScanParallelExecutorTest {
@Test
public void testForEmptyInput() {
Iterable<Integer> re =
- ScanParallelExecutor.parallelismBatchIterable(
- l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
- (List<Integer>) Collections.EMPTY_LIST,
- null);
+ sequentialBatchedExecute(
+ i -> singletonList(i + 1), (List<Integer>)
Collections.EMPTY_LIST, null);
Assertions.assertThat(!re.iterator().hasNext()).isTrue();
}
@Test
public void testForSingletonInput() {
Iterable<Integer> re =
- ScanParallelExecutor.parallelismBatchIterable(
- l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
- Collections.singletonList(1),
- null);
+ sequentialBatchedExecute(i -> singletonList(i + 1),
singletonList(1), null);
re.forEach(i -> Assertions.assertThat(i).isEqualTo(2));
}
@@ -150,8 +132,8 @@ public class ScanParallelExecutorTest {
public void testDifferentQueueSizeWithFilterElement() {
for (int queueSize = 1; queueSize < 20; queueSize++) {
Iterable<Integer> re =
- ScanParallelExecutor.parallelismBatchIterable(
- l -> l.parallelStream().filter(i -> i >
5).collect(Collectors.toList()),
+ sequentialBatchedExecute(
+ i -> i > 5 ? singletonList(i) : emptyList(),
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
queueSize);
Integer[] result = new Integer[] {6, 7, 8, 9, 10};
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index 01c2704de..379c61f70 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -54,18 +54,22 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
-import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
/** Migrate hive table to paimon table. */
public class HiveMigrator implements Migrator {
private static final Logger LOG =
LoggerFactory.getLogger(HiveMigrator.class);
+ private static final ThreadPoolExecutor EXECUTOR =
+ createCachedThreadPool(Runtime.getRuntime().availableProcessors(),
"HIVE_MIGRATOR");
+
private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
p -> !p.getPath().getName().startsWith("_") &&
!p.getPath().getName().startsWith(".");
@@ -171,9 +175,7 @@ public class HiveMigrator implements Migrator {
}
List<Future<CommitMessage>> futures =
- tasks.stream()
- .map(COMMON_IO_FORK_JOIN_POOL::submit)
- .collect(Collectors.toList());
+
tasks.stream().map(EXECUTOR::submit).collect(Collectors.toList());
List<CommitMessage> commitMessages = new ArrayList<>();
try {
for (Future<CommitMessage> future : futures) {