This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ae77340cb [core] Introduce delete-file.thread-num (#3751)
ae77340cb is described below
commit ae77340cbab1aff56a2b8d78ff78c6366be6fb60
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jul 16 14:44:36 2024 +0800
[core] Introduce delete-file.thread-num (#3751)
---
.../shortcodes/generated/core_configuration.html | 6 +++
.../main/java/org/apache/paimon/CoreOptions.java | 13 +++++
.../paimon/utils/FileDeletionThreadPool.java | 55 ++++++++++++++++++++++
.../java/org/apache/paimon/AbstractFileStore.java | 9 ++--
.../apache/paimon/operation/ChangelogDeletion.java | 6 ++-
.../apache/paimon/operation/FileDeletionBase.java | 18 +++----
.../apache/paimon/operation/SnapshotDeletion.java | 6 ++-
.../org/apache/paimon/operation/TagDeletion.java | 6 ++-
8 files changed, 102 insertions(+), 17 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 3e5709606..89db43b43 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -194,6 +194,12 @@ under the License.
<td>Duration</td>
<td>The TTL in rocksdb index for cross partition upsert (primary
keys not contain all partition fields), this can avoid maintaining too many
indexes and lead to worse and worse performance, but please note that this may
also cause data duplication.</td>
</tr>
+ <tr>
+ <td><h5>delete-file.thread-num</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>The maximum number of concurrent deleting files. By default is
the number of processors available to the Java virtual machine.</td>
+ </tr>
<tr>
<td><h5>delete.force-produce-changelog</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index d5bee40e8..8864bc3d3 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1284,6 +1284,14 @@ public class CoreOptions implements Serializable {
"When set to true, produce Iceberg metadata after
a snapshot is committed, "
+ "so that Iceberg readers can read
Paimon's raw files.");
+ public static final ConfigOption<Integer> DELETE_FILE_THREAD_NUM =
+ key("delete-file.thread-num")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The maximum number of concurrent deleting files. "
+ + "By default is the number of processors
available to the Java virtual machine.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -1508,6 +1516,11 @@ public class CoreOptions implements Serializable {
return options.get(SNAPSHOT_CLEAN_EMPTY_DIRECTORIES);
}
+ public int deleteFileThreadNum() {
+ return options.getOptional(DELETE_FILE_THREAD_NUM)
+ .orElseGet(() -> Runtime.getRuntime().availableProcessors());
+ }
+
public ExpireConfig expireConfig() {
return ExpireConfig.builder()
.snapshotRetainMax(snapshotNumRetainMax())
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
b/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
new file mode 100644
index 000000000..51b208930
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.fs.FileIO;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;
+
+/** Thread pool to delete files using {@link FileIO}. */
+public class FileDeletionThreadPool {
+
+ private static ThreadPoolExecutor executorService =
+ createCachedThreadPool(Runtime.getRuntime().availableProcessors());
+
+ public static synchronized ThreadPoolExecutor getExecutorService(int
threadNum) {
+ if (threadNum <= executorService.getMaximumPoolSize()) {
+ return executorService;
+ }
+ // we don't need to close previous pool
+ // it is just cached pool
+ executorService = createCachedThreadPool(threadNum);
+
+ return executorService;
+ }
+
+ private static ThreadPoolExecutor createCachedThreadPool(int threadNum) {
+ return new ThreadPoolExecutor(
+ 0,
+ threadNum,
+ 1,
+ TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(),
+ newDaemonThreadFactory("DELETE-FILE-THREAD-POOL"));
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index bdbeedd19..e33463d57 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -218,7 +218,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
newIndexFileHandler(),
newStatsFileHandler(),
options.changelogProducer() !=
CoreOptions.ChangelogProducer.NONE,
- options.cleanEmptyDirectories());
+ options.cleanEmptyDirectories(),
+ options.deleteFileThreadNum());
}
@Override
@@ -230,7 +231,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
manifestListFactory().create(),
newIndexFileHandler(),
newStatsFileHandler(),
- options.cleanEmptyDirectories());
+ options.cleanEmptyDirectories(),
+ options.deleteFileThreadNum());
}
@Override
@@ -247,7 +249,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
manifestListFactory().create(),
newIndexFileHandler(),
newStatsFileHandler(),
- options.cleanEmptyDirectories());
+ options.cleanEmptyDirectories(),
+ options.deleteFileThreadNum());
}
public abstract Comparator<InternalRow> newKeyComparator();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
index ea67f7e45..a73c0a078 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
@@ -46,7 +46,8 @@ public class ChangelogDeletion extends
FileDeletionBase<Changelog> {
ManifestList manifestList,
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler,
- boolean cleanEmptyDirectories) {
+ boolean cleanEmptyDirectories,
+ int deleteFileThreadNum) {
super(
fileIO,
pathFactory,
@@ -54,7 +55,8 @@ public class ChangelogDeletion extends
FileDeletionBase<Changelog> {
manifestList,
indexFileHandler,
statsFileHandler,
- cleanEmptyDirectories);
+ cleanEmptyDirectories,
+ deleteFileThreadNum);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index fb09acdc2..7c8d6656b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -32,8 +32,8 @@ import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.utils.FileDeletionThreadPool;
import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.TagManager;
@@ -71,9 +71,10 @@ public abstract class FileDeletionBase<T extends Snapshot> {
protected final IndexFileHandler indexFileHandler;
protected final StatsFileHandler statsFileHandler;
private final boolean cleanEmptyDirectories;
-
protected final Map<BinaryRow, Set<Integer>> deletionBuckets;
- protected final Executor ioExecutor;
+
+ private final Executor deleteFileExecutor;
+
protected boolean changelogDecoupled;
/** Used to record which tag is cached. */
@@ -89,7 +90,8 @@ public abstract class FileDeletionBase<T extends Snapshot> {
ManifestList manifestList,
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler,
- boolean cleanEmptyDirectories) {
+ boolean cleanEmptyDirectories,
+ int deleteFileThreadNum) {
this.fileIO = fileIO;
this.pathFactory = pathFactory;
this.manifestFile = manifestFile;
@@ -98,7 +100,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
this.statsFileHandler = statsFileHandler;
this.cleanEmptyDirectories = cleanEmptyDirectories;
this.deletionBuckets = new HashMap<>();
- this.ioExecutor = FileUtils.COMMON_IO_FORK_JOIN_POOL;
+ this.deleteFileExecutor =
FileDeletionThreadPool.getExecutorService(deleteFileThreadNum);
}
/**
@@ -455,15 +457,15 @@ public abstract class FileDeletionBase<T extends
Snapshot> {
}
}
- protected <T> void deleteFiles(Collection<T> files, Consumer<T> deletion) {
+ protected <F> void deleteFiles(Collection<F> files, Consumer<F> deletion) {
if (files.isEmpty()) {
return;
}
List<CompletableFuture<Void>> deletionFutures = new
ArrayList<>(files.size());
- for (T file : files) {
+ for (F file : files) {
deletionFutures.add(
- CompletableFuture.runAsync(() -> deletion.accept(file),
ioExecutor));
+ CompletableFuture.runAsync(() -> deletion.accept(file),
deleteFileExecutor));
}
try {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index 91400ae50..d86907ece 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -50,7 +50,8 @@ public class SnapshotDeletion extends
FileDeletionBase<Snapshot> {
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler,
boolean produceChangelog,
- boolean cleanEmptyDirectories) {
+ boolean cleanEmptyDirectories,
+ int deleteFileThreadNum) {
super(
fileIO,
pathFactory,
@@ -58,7 +59,8 @@ public class SnapshotDeletion extends
FileDeletionBase<Snapshot> {
manifestList,
indexFileHandler,
statsFileHandler,
- cleanEmptyDirectories);
+ cleanEmptyDirectories,
+ deleteFileThreadNum);
this.produceChangelog = produceChangelog;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 74eeaee90..a6cd338d5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -54,7 +54,8 @@ public class TagDeletion extends FileDeletionBase<Snapshot> {
ManifestList manifestList,
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler,
- boolean cleanEmptyDirectories) {
+ boolean cleanEmptyDirectories,
+ int deleteFileThreadNum) {
super(
fileIO,
pathFactory,
@@ -62,7 +63,8 @@ public class TagDeletion extends FileDeletionBase<Snapshot> {
manifestList,
indexFileHandler,
statsFileHandler,
- cleanEmptyDirectories);
+ cleanEmptyDirectories,
+ deleteFileThreadNum);
}
@Override