This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit f684400125ea758ceb70afb82ee969ec857d1082 Author: Jingsong Lee <[email protected]> AuthorDate: Fri Sep 26 21:10:14 2025 +0800 [core] Apply 'file-operation-thread-num' to commit (#6339) --- .../shortcodes/generated/core_configuration.html | 12 +++++------ .../main/java/org/apache/paimon/CoreOptions.java | 11 +++++----- ...hreadPool.java => FileOperationThreadPool.java} | 6 +++--- .../java/org/apache/paimon/AbstractFileStore.java | 6 +++--- .../apache/paimon/operation/FileDeletionBase.java | 4 ++-- .../paimon/operation/ListUnexistingFiles.java | 7 +++---- .../paimon/operation/LocalOrphanFilesClean.java | 4 ++-- .../paimon/table/AbstractFileStoreTable.java | 3 ++- .../apache/paimon/table/sink/TableCommitImpl.java | 24 +++++++++------------- .../apache/paimon/operation/FileDeletionTest.java | 4 ++-- 10 files changed, 39 insertions(+), 42 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 2ca6b6c7bc..0a6f7f2eb9 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -362,12 +362,6 @@ under the License. <td>Boolean</td> <td>Enable data file thin mode to avoid duplicate columns storage.</td> </tr> - <tr> - <td><h5>delete-file.thread-num</h5></td> - <td style="word-wrap: break-word;">(none)</td> - <td>Integer</td> - <td>The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.</td> - </tr> <tr> <td><h5>delete.force-produce-changelog</h5></td> <td style="word-wrap: break-word;">false</td> @@ -452,6 +446,12 @@ under the License. <td>Boolean</td> <td>Whether enabled read file index.</td> </tr> + <tr> + <td><h5>file-operation.thread-num</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>Integer</td> + <td>The maximum number of concurrent file operations. By default is the number of processors available to the Java virtual machine.</td> + </tr> <tr> <td><h5>file-reader-async-threshold</h5></td> <td style="word-wrap: break-word;">10 mb</td> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index ad7bb024f2..355a5fe357 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1769,12 +1769,13 @@ public class CoreOptions implements Serializable { + "a forced lookup compaction will be performed to flush L0 files to higher level. " + "This option is only valid when lookup-compact mode is gentle."); - public static final ConfigOption<Integer> DELETE_FILE_THREAD_NUM = - key("delete-file.thread-num") + public static final ConfigOption<Integer> FILE_OPERATION_THREAD_NUM = + key("file-operation.thread-num") .intType() .noDefaultValue() + .withFallbackKeys("delete-file.thread-num") .withDescription( - "The maximum number of concurrent deleting files. " + "The maximum number of concurrent file operations. " + "By default is the number of processors available to the Java virtual machine."); public static final ConfigOption<String> SCAN_FALLBACK_BRANCH = @@ -2266,8 +2267,8 @@ public class CoreOptions implements Serializable { return options.get(SNAPSHOT_CLEAN_EMPTY_DIRECTORIES); } - public int deleteFileThreadNum() { - return options.getOptional(DELETE_FILE_THREAD_NUM) + public int fileOperationThreadNum() { + return options.getOptional(FILE_OPERATION_THREAD_NUM) .orElseGet(() -> Runtime.getRuntime().availableProcessors()); } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java b/paimon-common/src/main/java/org/apache/paimon/utils/FileOperationThreadPool.java similarity index 89% rename from paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java rename to paimon-common/src/main/java/org/apache/paimon/utils/FileOperationThreadPool.java index 638d6f9a4c..87c096c50d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/FileDeletionThreadPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/FileOperationThreadPool.java @@ -24,10 +24,10 @@ import java.util.concurrent.ThreadPoolExecutor; import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; -/** Thread pool to delete files using {@link FileIO}. */ -public class FileDeletionThreadPool { +/** Thread pool to operate files using {@link FileIO}. */ +public class FileOperationThreadPool { - private static final String THREAD_NAME = "DELETE-FILE-THREAD-POOL"; + private static final String THREAD_NAME = "FILE-OPERATION-THREAD-POOL"; private static ThreadPoolExecutor executorService = createCachedThreadPool(Runtime.getRuntime().availableProcessors(), THREAD_NAME); diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 2d71f61580..f4a77f877d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -312,7 +312,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> { newStatsFileHandler(), options.changelogProducer() != CoreOptions.ChangelogProducer.NONE, options.cleanEmptyDirectories(), - options.deleteFileThreadNum()); + options.fileOperationThreadNum()); } @Override @@ -325,7 +325,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> { newIndexFileHandler(), newStatsFileHandler(), options.cleanEmptyDirectories(), - options.deleteFileThreadNum()); + options.fileOperationThreadNum()); } @Override @@ -343,7 +343,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> { newIndexFileHandler(), newStatsFileHandler(), options.cleanEmptyDirectories(), - options.deleteFileThreadNum()); + options.fileOperationThreadNum()); } public abstract Comparator<InternalRow> newKeyComparator(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index dae3d5a0f2..98a761b471 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -35,7 +35,7 @@ import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.utils.DataFilePathFactories; -import org.apache.paimon.utils.FileDeletionThreadPool; +import org.apache.paimon.utils.FileOperationThreadPool; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; @@ -102,7 +102,7 @@ public abstract class FileDeletionBase<T extends Snapshot> { this.statsFileHandler = statsFileHandler; this.cleanEmptyDirectories = cleanEmptyDirectories; this.deletionBuckets = new HashMap<>(); - this.deleteFileExecutor = FileDeletionThreadPool.getExecutorService(deleteFileThreadNum); + this.deleteFileExecutor = FileOperationThreadPool.getExecutorService(deleteFileThreadNum); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java index a704a329c7..c47916871a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java @@ -25,6 +25,7 @@ import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; +import org.apache.paimon.utils.FileOperationThreadPool; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.ThreadPoolUtils; @@ -38,8 +39,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; -import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; - /** List what data files recorded in manifests are missing from the filesystem. */ public class ListUnexistingFiles { @@ -51,8 +50,8 @@ public class ListUnexistingFiles { this.table = table; this.pathFactory = table.store().pathFactory(); this.executor = - createCachedThreadPool( - table.coreOptions().deleteFileThreadNum(), "LIST_UNEXISTING_FILES"); + FileOperationThreadPool.getExecutorService( + table.coreOptions().fileOperationThreadNum()); } public Map<Integer, Map<String, DataFileMeta>> list(BinaryRow partition) throws Exception { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 9456e6c9aa..34ff3fb721 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -89,7 +89,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean { this.deleteFiles = new ArrayList<>(); this.executor = createCachedThreadPool( - table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN"); + table.coreOptions().fileOperationThreadNum(), "ORPHAN_FILES_CLEAN"); this.dryRun = dryRun; } @@ -276,7 +276,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean { : new HashMap<String, String>() { { put( - CoreOptions.DELETE_FILE_THREAD_NUM.key(), + CoreOptions.FILE_OPERATION_THREAD_NUM.key(), parallelism.toString()); } }; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index c1d34a99ef..aa5b8bb70d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -455,7 +455,8 @@ abstract class AbstractFileStoreTable implements FileStoreTable { new ConsumerManager(fileIO, path, snapshotManager().branch()), options.snapshotExpireExecutionMode(), name(), - options.forceCreatingSnapshot()); + options.forceCreatingSnapshot(), + options.fileOperationThreadNum()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 7f19c7443c..4f3bb5b71b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -36,6 +36,7 @@ import org.apache.paimon.tag.TagTimeExpire; import org.apache.paimon.utils.CompactedChangelogPathResolver; import org.apache.paimon.utils.DataFilePathFactories; import org.apache.paimon.utils.ExecutorThreadFactory; +import org.apache.paimon.utils.FileOperationThreadPool; import org.apache.paimon.utils.IndexFilePathFactories; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; @@ -57,6 +58,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; @@ -66,7 +68,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.apache.paimon.CoreOptions.ExpireExecutionMode; import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER; -import static org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService; import static org.apache.paimon.utils.Preconditions.checkState; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn; @@ -79,18 +80,16 @@ public class TableCommitImpl implements InnerTableCommit { @Nullable private final Runnable expireSnapshots; @Nullable private final PartitionExpire partitionExpire; @Nullable private final TagAutoManager tagAutoManager; - @Nullable private final Duration consumerExpireTime; private final ConsumerManager consumerManager; - private final ExecutorService maintainExecutor; private final AtomicReference<Throwable> maintainError; - private final String tableName; + private final boolean forceCreatingSnapshot; + private final ThreadPoolExecutor fileCheckExecutor; @Nullable private Map<String, String> overwritePartition = null; private boolean batchCommitted = false; - private final boolean forceCreatingSnapshot; private boolean expireForEmptyCommit = true; public TableCommitImpl( @@ -102,7 +101,8 @@ public class TableCommitImpl implements InnerTableCommit { ConsumerManager consumerManager, ExpireExecutionMode expireExecutionMode, String tableName, - boolean forceCreatingSnapshot) { + boolean forceCreatingSnapshot, + int threadNum) { if (partitionExpire != null) { commit.withPartitionExpire(partitionExpire); } @@ -125,6 +125,7 @@ public class TableCommitImpl implements InnerTableCommit { this.tableName = tableName; this.forceCreatingSnapshot = forceCreatingSnapshot; + this.fileCheckExecutor = FileOperationThreadPool.getExecutorService(threadNum); } public boolean forceCreatingSnapshot() { @@ -294,17 +295,12 @@ public class TableCommitImpl implements InnerTableCommit { msg.newFilesIncrement().newIndexFiles().stream() .map(indexFileFactory::toPath) .forEach(files::add); - msg.newFilesIncrement().deletedIndexFiles().stream() - .map(indexFileFactory::toPath) - .forEach(files::add); - msg.compactIncrement().compactBefore().forEach(collector); msg.compactIncrement().compactAfter().forEach(collector); msg.compactIncrement().newIndexFiles().stream() .map(indexFileFactory::toPath) .forEach(files::add); - msg.compactIncrement().deletedIndexFiles().stream() - .map(indexFileFactory::toPath) - .forEach(files::add); + + // skip compact before files, deleted index files } } @@ -329,7 +325,7 @@ public class TableCommitImpl implements InnerTableCommit { List<Path> nonExistFiles = Lists.newArrayList( randomlyExecuteSequentialReturn( - getExecutorService(null), + fileCheckExecutor, f -> nonExists.test(f) ? singletonList(f) : emptyList(), resolvedFiles)); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 0bc6e041ab..9d17593c11 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -751,7 +751,7 @@ public class FileDeletionTest { store.newStatsFileHandler(), store.options().changelogProducer() != CoreOptions.ChangelogProducer.NONE, store.options().cleanEmptyDirectories(), - store.options().deleteFileThreadNum()); + store.options().fileOperationThreadNum()); ExpireSnapshots expireSnapshots = new ExpireSnapshotsImpl( @@ -816,7 +816,7 @@ public class FileDeletionTest { store.newStatsFileHandler(), store.options().changelogProducer() != CoreOptions.ChangelogProducer.NONE, store.options().cleanEmptyDirectories(), - store.options().deleteFileThreadNum()); + store.options().fileOperationThreadNum()); ExpireSnapshots expireSnapshots = new ExpireSnapshotsImpl( snapshotManager, changelogManager, snapshotDeletion, tagManager);
