This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e067aeae9d IGNITE-21494 Data Streamer: use one executor per node for 
flushTimer (#3562)
e067aeae9d is described below

commit e067aeae9d3d5f1d8eb5ab9816ce7ce4417c4d86
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Apr 9 11:19:45 2024 +0300

    IGNITE-21494 Data Streamer: use one executor per node for flushTimer (#3562)
    
    Use one shared lazily-initialized scheduled executor for data streamer auto 
flush functionality per node / client (instead of new executor per streamer).
---
 .../ignite/internal/client/ReliableChannel.java    | 44 ++++++++++++++++++----
 .../internal/client/table/ClientDataStreamer.java  |  7 +++-
 .../ignite/client/fakes/FakeInternalTable.java     |  6 +++
 .../internal/streamer/StreamerSubscriber.java      | 43 +++++++++++----------
 .../apache/ignite/internal/util/IgniteUtils.java   |  6 ++-
 .../internal/streamer/StreamerSubscriberTest.java  | 22 +++++++++++
 .../exec/rel/TableScanNodeExecutionTest.java       |  3 +-
 .../ignite/internal/table/ItColocationTest.java    |  3 +-
 .../apache/ignite/internal/table/DataStreamer.java | 11 +++++-
 .../ignite/internal/table/InternalTable.java       |  8 ++++
 .../internal/table/KeyValueBinaryViewImpl.java     |  2 +-
 .../ignite/internal/table/KeyValueViewImpl.java    |  2 +-
 .../internal/table/RecordBinaryViewImpl.java       |  2 +-
 .../ignite/internal/table/RecordViewImpl.java      |  4 +-
 .../internal/table/distributed/TableManager.java   | 40 +++++++++++++++++---
 .../distributed/storage/InternalTableImpl.java     | 13 ++++++-
 .../distributed/storage/InternalTableImplTest.java |  6 ++-
 .../apache/ignite/distributed/ItTxTestCluster.java |  3 +-
 .../table/impl/DummyInternalTableImpl.java         |  3 +-
 19 files changed, 179 insertions(+), 49 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index b4343351bd..26336a37f9 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static 
org.apache.ignite.lang.ErrorGroups.Client.CLUSTER_ID_MISMATCH_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Client.CONFIGURATION_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
@@ -37,7 +38,9 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -55,6 +58,8 @@ import org.apache.ignite.client.RetryPolicyContext;
 import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
 import 
org.apache.ignite.internal.client.io.netty.NettyClientConnectionMultiplexer;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
@@ -113,6 +118,10 @@ public final class ReliableChannel implements 
AutoCloseable {
     /** Cluster id from the first handshake. */
     private final AtomicReference<UUID> clusterId = new AtomicReference<>();
 
+    /** Scheduled executor for streamer flush. */
+    @Nullable
+    private ScheduledExecutorService streamerFlushExecutor;
+
     /**
      * Constructor.
      *
@@ -134,18 +143,21 @@ public final class ReliableChannel implements 
AutoCloseable {
 
     /** {@inheritDoc} */
     @Override
-    public synchronized void close() {
+    public synchronized void close() throws Exception {
         closed = true;
 
         List<ClientChannelHolder> holders = channels;
 
-        if (holders != null) {
-            for (ClientChannelHolder hld : holders) {
-                hld.close();
-            }
-        }
-
-        connMgr.stop();
+        IgniteUtils.closeAllManually(
+                () -> {
+                    if (holders != null) {
+                        for (ClientChannelHolder hld : holders) {
+                            hld.close();
+                        }
+                    }
+                },
+                connMgr::stop,
+                () -> shutdownAndAwaitTermination(streamerFlushExecutor, 10, 
TimeUnit.SECONDS));
     }
 
     /**
@@ -675,6 +687,22 @@ public final class ReliableChannel implements 
AutoCloseable {
         return partitionAssignmentTimestamp.get();
     }
 
+    /**
+     * Gets the data streamer flush scheduled executor.
+     *
+     * @return Streamer flush executor.
+     */
+    public synchronized ScheduledExecutorService streamerFlushExecutor() {
+        if (streamerFlushExecutor == null) {
+            streamerFlushExecutor = Executors.newSingleThreadScheduledExecutor(
+                    new NamedThreadFactory(
+                            "client-data-streamer-flush-" + hashCode(),
+                            ClientUtils.logger(clientCfg, 
ReliableChannel.class)));
+        }
+
+        return streamerFlushExecutor;
+    }
+
     @Nullable
     private static IgniteClientConnectionException 
unwrapConnectionException(Throwable err) {
         while (err instanceof CompletionException) {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
index 3b436b7d2b..d846a029df 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
@@ -42,7 +42,12 @@ class ClientDataStreamer {
         IgniteLogger log = ClientUtils.logger(tbl.channel().configuration(), 
StreamerSubscriber.class);
         StreamerOptions streamerOpts = streamerOptions(options);
         StreamerSubscriber<R, Integer> subscriber = new StreamerSubscriber<>(
-                batchSender, partitionAwarenessProvider, streamerOpts, log, 
tbl.channel().metrics());
+                batchSender,
+                partitionAwarenessProvider,
+                streamerOpts,
+                tbl.channel().streamerFlushExecutor(),
+                log,
+                tbl.channel().metrics());
 
         publisher.subscribe(subscriber);
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 2ca1275aae..8016953e97 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -32,6 +32,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.BiConsumer;
 import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -492,4 +493,9 @@ public class FakeInternalTable implements InternalTable {
     public @Nullable PendingComparableValuesTracker<Long, Void> 
getPartitionStorageIndexTracker(int partitionId) {
         return null;
     }
+
+    @Override
+    public ScheduledExecutorService streamerFlushExecutor() {
+        throw new UnsupportedOperationException("Not implemented");
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index 6a545cd07e..6072f27f4d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
@@ -33,8 +32,6 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.table.DataStreamerItem;
 import org.jetbrains.annotations.Nullable;
 
@@ -67,12 +64,14 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<DataStreamerItem<T>>
 
     private final StreamerMetricSink metrics;
 
-    private @Nullable Flow.Subscription subscription;
+    private final ScheduledExecutorService flushExecutor;
 
-    private @Nullable ScheduledExecutorService flushTimer;
+    private @Nullable Flow.Subscription subscription;
 
     private @Nullable ScheduledFuture<?> flushTask;
 
+    private boolean closed;
+
     /**
      * Constructor.
      *
@@ -83,23 +82,26 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<DataStreamerItem<T>>
             StreamerBatchSender<T, P> batchSender,
             StreamerPartitionAwarenessProvider<T, P> 
partitionAwarenessProvider,
             StreamerOptions options,
+            ScheduledExecutorService flushExecutor,
             IgniteLogger log,
             @Nullable StreamerMetricSink metrics) {
         assert batchSender != null;
         assert partitionAwarenessProvider != null;
         assert options != null;
+        assert flushExecutor != null;
         assert log != null;
 
         this.batchSender = batchSender;
         this.partitionAwarenessProvider = partitionAwarenessProvider;
         this.options = options;
+        this.flushExecutor = flushExecutor;
         this.log = log;
         this.metrics = getMetrics(metrics);
     }
 
     /** {@inheritDoc} */
     @Override
-    public void onSubscribe(Subscription subscription) {
+    public synchronized void onSubscribe(Subscription subscription) {
         if (this.subscription != null) {
             throw new IllegalStateException("Subscription is already set.");
         }
@@ -208,13 +210,9 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<DataStreamerItem<T>>
         }
     }
 
-    private void close(@Nullable Throwable throwable) {
-        if (flushTimer != null) {
-            assert flushTask != null;
-
+    private synchronized void close(@Nullable Throwable throwable) {
+        if (flushTask != null) {
             flushTask.cancel(false);
-
-            IgniteUtils.shutdownAndAwaitTermination(flushTimer, 10, 
TimeUnit.SECONDS);
         }
 
         var s = subscription;
@@ -232,9 +230,15 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<DataStreamerItem<T>>
         } else {
             completionFut.completeExceptionally(throwable);
         }
+
+        closed = true;
     }
 
-    private void requestMore() {
+    private synchronized void requestMore() {
+        if (closed || subscription == null) {
+            return;
+        }
+
         // This method controls backpressure. We won't get more items than we 
requested.
         // The idea is to have perPartitionParallelOperations batches in 
flight for every connection.
         var pending = pendingItemCount.get();
@@ -246,23 +250,22 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<DataStreamerItem<T>>
             return;
         }
 
-        assert subscription != null;
         subscription.request(count);
         pendingItemCount.addAndGet(count);
     }
 
-    private void initFlushTimer() {
+    private synchronized void initFlushTimer() {
+        if (closed) {
+            return;
+        }
+
         int interval = options.autoFlushFrequency();
 
         if (interval <= 0) {
             return;
         }
 
-        String threadPrefix = "client-data-streamer-flush-" + hashCode();
-
-        flushTimer = Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory(threadPrefix, log));
-
-        flushTask = flushTimer.scheduleAtFixedRate(this::flushBuffers, 
interval, interval, TimeUnit.MILLISECONDS);
+        flushTask = flushExecutor.scheduleAtFixedRate(this::flushBuffers, 
interval, interval, TimeUnit.MILLISECONDS);
     }
 
     private void flushBuffers() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 1f9bf4e85f..439459111a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -522,7 +522,11 @@ public class IgniteUtils {
      * @param timeout the maximum time to wait for the {@code ExecutorService} 
to terminate
      * @param unit the time unit of the timeout argument
      */
-    public static void shutdownAndAwaitTermination(ExecutorService service, 
long timeout, TimeUnit unit) {
+    public static void shutdownAndAwaitTermination(@Nullable ExecutorService 
service, long timeout, TimeUnit unit) {
+        if (service == null) {
+            return;
+        }
+
         long halfTimeoutNanos = unit.toNanos(timeout) / 2;
 
         // Disable new tasks from being submitted
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
index cfd6cecf1a..4fdd9bf62c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
@@ -23,16 +23,37 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.LongFunction;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.table.DataStreamerItem;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 class StreamerSubscriberTest extends BaseIgniteAbstractTest {
+    private static ScheduledExecutorService flushExecutor;
+
+    @BeforeAll
+    public static void flushExecutorInit() {
+        flushExecutor = Executors.newSingleThreadScheduledExecutor(
+                new NamedThreadFactory("flushExecutor", 
Loggers.forClass(StreamerSubscriberTest.class)));
+    }
+
+    @AfterAll
+    public static void flushExecutorShutdown() {
+        if (flushExecutor != null) {
+            flushExecutor.shutdown();
+        }
+    }
+
     private static class Metrics implements StreamerMetricSink {
         private final LongAdder batchesSent = new LongAdder();
         private final LongAdder itemsSent = new LongAdder();
@@ -157,6 +178,7 @@ class StreamerSubscriberTest extends BaseIgniteAbstractTest 
{
                 (part, batch, deleted) -> sendFuture,
                 partitionProvider,
                 options,
+                flushExecutor,
                 log,
                 metrics
         );
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index ab0a3d5f48..9afaa4e1fd 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -250,7 +250,8 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
                     ),
                     mock(TransactionInflights.class),
                     3_000,
-                    0
+                    0,
+                    null
             );
             this.dataAmount = dataAmount;
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 767846b6b8..7574e24f7e 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -298,7 +298,8 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                 new TableRaftServiceImpl("PUBLIC.TEST", PARTS, partRafts, new 
SingleClusterNodeResolver(clusterNode)),
                 transactionInflights,
                 3_000,
-                0
+                0,
+                null
         );
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
index d7ba837ca8..fb6a6a73e4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.streamer.StreamerBatchSender;
@@ -36,10 +37,16 @@ class DataStreamer {
             Publisher<DataStreamerItem<R>> publisher,
             @Nullable DataStreamerOptions options,
             StreamerBatchSender<R, Integer> batchSender,
-            StreamerPartitionAwarenessProvider<R, Integer> 
partitionAwarenessProvider) {
+            StreamerPartitionAwarenessProvider<R, Integer> 
partitionAwarenessProvider,
+            ScheduledExecutorService flushExecutor) {
         StreamerOptions streamerOpts = streamerOptions(options);
         StreamerSubscriber<R, Integer> subscriber = new StreamerSubscriber<>(
-                batchSender, partitionAwarenessProvider, streamerOpts, LOG, 
null);
+                batchSender,
+                partitionAwarenessProvider,
+                streamerOpts,
+                flushExecutor,
+                LOG,
+                null);
 
         publisher.subscribe(subscriber);
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index b1a388623a..a59ebba8fc 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -478,4 +479,11 @@ public interface InternalTable extends ManuallyCloseable {
      * @param partitionId Partition ID.
      */
     @Nullable PendingComparableValuesTracker<Long, Void> 
getPartitionStorageIndexTracker(int partitionId);
+
+    /**
+     * Gets the streamer flush executor service.
+     *
+     * @return Streamer flush executor.
+     */
+    ScheduledExecutorService streamerFlushExecutor();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index 332a13671c..8c52d01f92 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -567,7 +567,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
                         schemaVersion -> 
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted, 
partitionId)
                 ));
 
-        CompletableFuture<Void> future = DataStreamer.streamData(publisher, 
options, batchSender, partitioner);
+        CompletableFuture<Void> future = DataStreamer.streamData(publisher, 
options, batchSender, partitioner, tbl.streamerFlushExecutor());
         return convertToPublicFuture(future);
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 238cb8e320..78b719d879 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -706,7 +706,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
                         schemaVersion -> 
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted, 
partitionId)
                 ));
 
-        CompletableFuture<Void> future = DataStreamer.streamData(publisher, 
options, batchSender, partitioner);
+        CompletableFuture<Void> future = DataStreamer.streamData(publisher, 
options, batchSender, partitioner, tbl.streamerFlushExecutor());
         return convertToPublicFuture(future);
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index b2d9ccc5cf..94903d0d2b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -492,7 +492,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
                         schemaVersion -> this.tbl.updateAll(mapToBinary(rows, 
schemaVersion, deleted), deleted, partitionId)
                 ));
 
-        CompletableFuture<Void> future = DataStreamer.streamData(publisher, 
options, batchSender, partitioner);
+        CompletableFuture<Void> future = DataStreamer.streamData(publisher, 
options, batchSender, partitioner, tbl.streamerFlushExecutor());
         return convertToPublicFuture(future);
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 85d7da5d8f..711d12c8ae 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -582,7 +582,9 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
                         schemaVersion -> this.tbl.updateAll(marshal(items, 
schemaVersion, deleted), deleted, partitionId)
                 ));
 
-        CompletableFuture<Void> future = DataStreamer.streamData(publisher, 
options, batchSender, partitioner);
+        CompletableFuture<Void> future = DataStreamer.streamData(
+                publisher, options, batchSender, partitioner, 
tbl.streamerFlushExecutor());
+
         return convertToPublicFuture(future);
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 109adb66dc..dcc5088bd0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -407,9 +407,15 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final TransactionConfiguration txCfg;
 
+    private final String nodeName;
+
     private long implicitTransactionTimeout;
+
     private int attemptsObtainLock;
 
+    @Nullable
+    private ScheduledExecutorService streamerFlushExecutor;
+
     /**
      * Creates a new table manager.
      *
@@ -500,6 +506,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.lowWatermark = lowWatermark;
         this.transactionInflights = transactionInflights;
         this.txCfg = txCfg;
+        this.nodeName = nodeName;
 
         this.executorInclinedSchemaSyncService = new 
ExecutorInclinedSchemaSyncService(schemaSyncService, 
partitionOperationsExecutor);
         this.executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -1157,15 +1164,18 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             return;
         }
 
+        int shutdownTimeoutSeconds = 10;
+
         IgniteUtils.closeAllManually(
                 mvGc,
                 fullStateTransferIndexChooser,
                 sharedTxStateStorage,
-                () -> shutdownAndAwaitTermination(rebalanceScheduler, 10, 
TimeUnit.SECONDS),
-                () -> shutdownAndAwaitTermination(txStateStoragePool, 10, 
TimeUnit.SECONDS),
-                () -> shutdownAndAwaitTermination(txStateStorageScheduledPool, 
10, TimeUnit.SECONDS),
-                () -> shutdownAndAwaitTermination(scanRequestExecutor, 10, 
TimeUnit.SECONDS),
-                () -> shutdownAndAwaitTermination(incomingSnapshotsExecutor, 
10, TimeUnit.SECONDS)
+                () -> shutdownAndAwaitTermination(rebalanceScheduler, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
+                () -> shutdownAndAwaitTermination(txStateStoragePool, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
+                () -> shutdownAndAwaitTermination(txStateStorageScheduledPool, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
+                () -> shutdownAndAwaitTermination(scanRequestExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
+                () -> shutdownAndAwaitTermination(incomingSnapshotsExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
+                () -> shutdownAndAwaitTermination(streamerFlushExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS)
         );
     }
 
@@ -1320,7 +1330,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 tableRaftService,
                 transactionInflights,
                 implicitTransactionTimeout,
-                attemptsObtainLock
+                attemptsObtainLock,
+                this::streamerFlushExecutor
         );
 
         var table = new TableImpl(
@@ -2543,4 +2554,21 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         engine.dropMvTable(tableDescriptor.id());
     }
+
+    private synchronized ScheduledExecutorService streamerFlushExecutor() {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            if (streamerFlushExecutor == null) {
+                streamerFlushExecutor = 
Executors.newSingleThreadScheduledExecutor(
+                        IgniteThreadFactory.create(nodeName, 
"streamer-flush-executor", LOG, STORAGE_WRITE));
+            }
+
+            return streamerFlushExecutor;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index cde6397855..fddd5a6d69 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -55,12 +55,14 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -131,6 +133,8 @@ public class InternalTableImpl implements InternalTable {
     /** Partitions. */
     private final int partitions;
 
+    private final Supplier<ScheduledExecutorService> streamerFlushExecutor;
+
     /** Table name. */
     private volatile String tableName;
 
@@ -217,7 +221,8 @@ public class InternalTableImpl implements InternalTable {
             TableRaftServiceImpl tableRaftService,
             TransactionInflights transactionInflights,
             long implicitTransactionTimeout,
-            int attemptsObtainLock
+            int attemptsObtainLock,
+            Supplier<ScheduledExecutorService> streamerFlushExecutor
     ) {
         this.tableName = tableName;
         this.tableId = tableId;
@@ -235,6 +240,7 @@ public class InternalTableImpl implements InternalTable {
         this.transactionInflights = transactionInflights;
         this.implicitTransactionTimeout = implicitTransactionTimeout;
         this.attemptsObtainLock = attemptsObtainLock;
+        this.streamerFlushExecutor = streamerFlushExecutor;
     }
 
     /** {@inheritDoc} */
@@ -2162,6 +2168,11 @@ public class InternalTableImpl implements InternalTable {
         return storageIndexTrackerByPartitionId.get(partitionId);
     }
 
+    @Override
+    public ScheduledExecutorService streamerFlushExecutor() {
+        return streamerFlushExecutor.get();
+    }
+
     /**
      * Updates the partition trackers, if there were previous ones, it closes 
them.
      *
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 0919a7c71a..b07518f94d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -74,7 +74,8 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 new TableRaftServiceImpl("test", 1, Int2ObjectMaps.emptyMap(), 
new SingleClusterNodeResolver(mock(ClusterNode.class))),
                 mock(TransactionInflights.class),
                 3_000,
-                0
+                0,
+                null
         );
 
         // Let's check the empty table.
@@ -123,7 +124,8 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 new TableRaftServiceImpl("test", 3, Int2ObjectMaps.emptyMap(), 
new SingleClusterNodeResolver(mock(ClusterNode.class))),
                 mock(TransactionInflights.class),
                 3_000,
-                0
+                0,
+                null
         );
 
         List<BinaryRowEx> originalRows = List.of(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index a3ab14808e..edb40c765b 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -748,7 +748,8 @@ public class ItTxTestCluster {
                         new TableRaftServiceImpl(tableName, 1, clients, 
nodeResolver),
                         clientTransactionInflights,
                         500,
-                        0
+                        0,
+                        null
                 ),
                 new DummySchemaManagerImpl(schemaDescriptor),
                 clientTxManager.lockManager(),
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 2cce50cc5a..6a0f189414 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -256,7 +256,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 ),
                 transactionInflights,
                 3_000,
-                0
+                0,
+                null
         );
 
         RaftGroupService svc = 
tableRaftService().partitionRaftGroupService(PART_ID);

Reply via email to