This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ed4eff4a981 IGNITE-26549 Async file io uses default pool (#6673)
ed4eff4a981 is described below
commit ed4eff4a98129de6c0eb991d66ff68f4e871d5e5
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Wed Oct 1 19:13:40 2025 +0300
IGNITE-26549 Async file io uses default pool (#6673)
---
modules/file-io/build.gradle | 1 +
.../apache/ignite/internal/fileio/AsyncFileIo.java | 7 +++--
.../ignite/internal/fileio/AsyncFileIoFactory.java | 23 ++++++++++++++-
.../PersistentPageMemoryStorageEngine.java | 34 ++++++++++++++++++++--
4 files changed, 59 insertions(+), 6 deletions(-)
diff --git a/modules/file-io/build.gradle b/modules/file-io/build.gradle
index 0ca66c38f60..7f09511f3c5 100644
--- a/modules/file-io/build.gradle
+++ b/modules/file-io/build.gradle
@@ -20,6 +20,7 @@ apply from: "$rootDir/buildscripts/publishing.gradle"
apply from: "$rootDir/buildscripts/java-junit5.gradle"
dependencies {
+ implementation libs.jetbrains.annotations
implementation project(':ignite-core')
testImplementation project(':ignite-core')
diff --git
a/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIo.java
b/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIo.java
index 536eed16444..eabed32c019 100644
---
a/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIo.java
+++
b/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIo.java
@@ -31,6 +31,8 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import org.jetbrains.annotations.Nullable;
/**
* {@link FileIo} implementation based on {@link AsynchronousFileChannel}.
@@ -48,11 +50,12 @@ public class AsyncFileIo extends AbstractFileIo {
* Creates I/O implementation for specified file.
*
* @param filePath File path.
+ * @param asyncIoExecutor Callback executor. If the parameter is {@code
null}, then the system default thread pool will be used.
* @param modes Open modes.
* @throws IOException If some I/O error occurs.
*/
- public AsyncFileIo(Path filePath, OpenOption... modes) throws IOException {
- ch = AsynchronousFileChannel.open(filePath, modes);
+ public AsyncFileIo(Path filePath, @Nullable ExecutorService
asyncIoExecutor, OpenOption... modes) throws IOException {
+ ch = AsynchronousFileChannel.open(filePath, Set.of(modes),
asyncIoExecutor);
}
/** {@inheritDoc} */
diff --git
a/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIoFactory.java
b/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIoFactory.java
index f3b09fc6b71..d1521af472b 100644
---
a/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIoFactory.java
+++
b/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIoFactory.java
@@ -20,14 +20,35 @@ package org.apache.ignite.internal.fileio;
import java.io.IOException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
+import java.util.concurrent.ExecutorService;
+import org.jetbrains.annotations.Nullable;
/**
* {@link AsyncFileIo} factory.
*/
public class AsyncFileIoFactory implements FileIoFactory {
+ /** Async callback executor. {@code null} for system default pool. */
+ private final @Nullable ExecutorService asyncIoExecutor;
+
+ /**
+ * Constructor.
+ */
+ public AsyncFileIoFactory() {
+ this(null);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param asyncIoExecutor Async callback executor or {@code null} to use
the default pool.
+ */
+ public AsyncFileIoFactory(@Nullable ExecutorService asyncIoExecutor) {
+ this.asyncIoExecutor = asyncIoExecutor;
+ }
+
/** {@inheritDoc} */
@Override
public FileIo create(Path filePath, OpenOption... modes) throws
IOException {
- return new AsyncFileIo(filePath, modes);
+ return new AsyncFileIo(filePath, asyncIoExecutor, modes);
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index 1e67d000783..c5cd187975a 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -116,6 +116,15 @@ public class PersistentPageMemoryStorageEngine extends
AbstractPageMemoryStorage
private volatile ExecutorService destructionExecutor;
+ /**
+ * Executor service for performing asynchronous I/O operations.
+ *
+ * <p>
+ * This field is initialized when the engine is configured to use
asynchronous file I/O.
+ * If the engine is configured to use synchronous I/O, this field remains
{@code null}.
+ */
+ private volatile @Nullable ExecutorService asyncIoExecutor;
+
private final FailureManager failureManager;
private final LogSyncer logSyncer;
@@ -182,9 +191,24 @@ public class PersistentPageMemoryStorageEngine extends
AbstractPageMemoryStorage
int pageSize = engineConfig.pageSizeBytes().value();
try {
- FileIoFactory fileIoFactory =
engineConfig.checkpoint().useAsyncFileIoFactory().value()
- ? new AsyncFileIoFactory()
- : new RandomAccessFileIoFactory();
+ FileIoFactory fileIoFactory;
+
+ if (engineConfig.checkpoint().useAsyncFileIoFactory().value()) {
+ asyncIoExecutor = new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors(),
+ Runtime.getRuntime().availableProcessors(),
+ 100,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ IgniteThreadFactory.create(igniteInstanceName,
"persistent-mv-async-io", LOG)
+ );
+
+ fileIoFactory = new AsyncFileIoFactory(asyncIoExecutor);
+ } else {
+ asyncIoExecutor = null;
+
+ fileIoFactory = new RandomAccessFileIoFactory();
+ }
filePageStoreManager =
createFilePageStoreManager(igniteInstanceName, storagePath, fileIoFactory,
pageSize, failureManager);
@@ -262,12 +286,16 @@ public class PersistentPageMemoryStorageEngine extends
AbstractPageMemoryStorage
ExecutorService destructionExecutor = this.destructionExecutor;
CheckpointManager checkpointManager = this.checkpointManager;
FilePageStoreManager filePageStoreManager =
this.filePageStoreManager;
+ ExecutorService asyncIoExecutor = this.asyncIoExecutor;
Stream<AutoCloseable> resources = Stream.of(
destructionExecutor == null
? null
: (AutoCloseable) () ->
shutdownAndAwaitTermination(destructionExecutor, 30, TimeUnit.SECONDS),
checkpointManager == null ? null : (AutoCloseable)
checkpointManager::stop,
+ asyncIoExecutor == null
+ ? null
+ : (AutoCloseable) () ->
shutdownAndAwaitTermination(asyncIoExecutor, 30, TimeUnit.SECONDS),
filePageStoreManager == null ? null : (AutoCloseable)
filePageStoreManager::stop
);