This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ed4eff4a981 IGNITE-26549 Async file io uses default pool (#6673)
ed4eff4a981 is described below

commit ed4eff4a98129de6c0eb991d66ff68f4e871d5e5
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Wed Oct 1 19:13:40 2025 +0300

    IGNITE-26549 Async file io uses default pool (#6673)
---
 modules/file-io/build.gradle                       |  1 +
 .../apache/ignite/internal/fileio/AsyncFileIo.java |  7 +++--
 .../ignite/internal/fileio/AsyncFileIoFactory.java | 23 ++++++++++++++-
 .../PersistentPageMemoryStorageEngine.java         | 34 ++++++++++++++++++++--
 4 files changed, 59 insertions(+), 6 deletions(-)

diff --git a/modules/file-io/build.gradle b/modules/file-io/build.gradle
index 0ca66c38f60..7f09511f3c5 100644
--- a/modules/file-io/build.gradle
+++ b/modules/file-io/build.gradle
@@ -20,6 +20,7 @@ apply from: "$rootDir/buildscripts/publishing.gradle"
 apply from: "$rootDir/buildscripts/java-junit5.gradle"
 
 dependencies {
+    implementation libs.jetbrains.annotations
     implementation project(':ignite-core')
 
     testImplementation project(':ignite-core')
diff --git 
a/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIo.java
 
b/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIo.java
index 536eed16444..eabed32c019 100644
--- 
a/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIo.java
+++ 
b/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIo.java
@@ -31,6 +31,8 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * {@link FileIo} implementation based on {@link AsynchronousFileChannel}.
@@ -48,11 +50,12 @@ public class AsyncFileIo extends AbstractFileIo {
      * Creates I/O implementation for specified file.
      *
      * @param filePath File path.
+     * @param asyncIoExecutor Callback executor. If the parameter is {@code 
null}, then the system default thread pool will be used.
      * @param modes Open modes.
      * @throws IOException If some I/O error occurs.
      */
-    public AsyncFileIo(Path filePath, OpenOption... modes) throws IOException {
-        ch = AsynchronousFileChannel.open(filePath, modes);
+    public AsyncFileIo(Path filePath, @Nullable ExecutorService 
asyncIoExecutor, OpenOption... modes) throws IOException {
+        ch = AsynchronousFileChannel.open(filePath, Set.of(modes), 
asyncIoExecutor);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIoFactory.java
 
b/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIoFactory.java
index f3b09fc6b71..d1521af472b 100644
--- 
a/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIoFactory.java
+++ 
b/modules/file-io/src/main/java/org/apache/ignite/internal/fileio/AsyncFileIoFactory.java
@@ -20,14 +20,35 @@ package org.apache.ignite.internal.fileio;
 import java.io.IOException;
 import java.nio.file.OpenOption;
 import java.nio.file.Path;
+import java.util.concurrent.ExecutorService;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * {@link AsyncFileIo} factory.
  */
 public class AsyncFileIoFactory implements FileIoFactory {
+    /** Async callback executor. {@code null} for system default pool. */
+    private final @Nullable ExecutorService asyncIoExecutor;
+
+    /**
+     * Constructor.
+     */
+    public AsyncFileIoFactory() {
+        this(null);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param asyncIoExecutor Async callback executor or {@code null} to use 
the default pool.
+     */
+    public AsyncFileIoFactory(@Nullable ExecutorService asyncIoExecutor) {
+        this.asyncIoExecutor = asyncIoExecutor;
+    }
+
     /** {@inheritDoc} */
     @Override
     public FileIo create(Path filePath, OpenOption... modes) throws 
IOException {
-        return new AsyncFileIo(filePath, modes);
+        return new AsyncFileIo(filePath, asyncIoExecutor, modes);
     }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index 1e67d000783..c5cd187975a 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -116,6 +116,15 @@ public class PersistentPageMemoryStorageEngine extends 
AbstractPageMemoryStorage
 
     private volatile ExecutorService destructionExecutor;
 
+    /**
+     * Executor service for performing asynchronous I/O operations.
+     *
+     * <p>
+     * This field is initialized when the engine is configured to use 
asynchronous file I/O.
+     * If the engine is configured to use synchronous I/O, this field remains 
{@code null}.
+     */
+    private volatile @Nullable ExecutorService asyncIoExecutor;
+
     private final FailureManager failureManager;
 
     private final LogSyncer logSyncer;
@@ -182,9 +191,24 @@ public class PersistentPageMemoryStorageEngine extends 
AbstractPageMemoryStorage
         int pageSize = engineConfig.pageSizeBytes().value();
 
         try {
-            FileIoFactory fileIoFactory = 
engineConfig.checkpoint().useAsyncFileIoFactory().value()
-                    ? new AsyncFileIoFactory()
-                    : new RandomAccessFileIoFactory();
+            FileIoFactory fileIoFactory;
+
+            if (engineConfig.checkpoint().useAsyncFileIoFactory().value()) {
+                asyncIoExecutor = new ThreadPoolExecutor(
+                        Runtime.getRuntime().availableProcessors(),
+                        Runtime.getRuntime().availableProcessors(),
+                        100,
+                        TimeUnit.MILLISECONDS,
+                        new LinkedBlockingQueue<>(),
+                        IgniteThreadFactory.create(igniteInstanceName, 
"persistent-mv-async-io", LOG)
+                );
+
+                fileIoFactory = new AsyncFileIoFactory(asyncIoExecutor);
+            } else {
+                asyncIoExecutor = null;
+
+                fileIoFactory = new RandomAccessFileIoFactory();
+            }
 
             filePageStoreManager = 
createFilePageStoreManager(igniteInstanceName, storagePath, fileIoFactory, 
pageSize, failureManager);
 
@@ -262,12 +286,16 @@ public class PersistentPageMemoryStorageEngine extends 
AbstractPageMemoryStorage
             ExecutorService destructionExecutor = this.destructionExecutor;
             CheckpointManager checkpointManager = this.checkpointManager;
             FilePageStoreManager filePageStoreManager = 
this.filePageStoreManager;
+            ExecutorService asyncIoExecutor = this.asyncIoExecutor;
 
             Stream<AutoCloseable> resources = Stream.of(
                     destructionExecutor == null
                             ? null
                             : (AutoCloseable) () -> 
shutdownAndAwaitTermination(destructionExecutor, 30, TimeUnit.SECONDS),
                     checkpointManager == null ? null : (AutoCloseable) 
checkpointManager::stop,
+                    asyncIoExecutor == null
+                            ? null
+                            : (AutoCloseable) () -> 
shutdownAndAwaitTermination(asyncIoExecutor, 30, TimeUnit.SECONDS),
                     filePageStoreManager == null ? null : (AutoCloseable) 
filePageStoreManager::stop
             );
 

Reply via email to