This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e067aeae9d IGNITE-21494 Data Streamer: use one executor per node for
flushTimer (#3562)
e067aeae9d is described below
commit e067aeae9d3d5f1d8eb5ab9816ce7ce4417c4d86
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Apr 9 11:19:45 2024 +0300
IGNITE-21494 Data Streamer: use one executor per node for flushTimer (#3562)
Use one shared lazily-initialized scheduled executor for data streamer auto
flush functionality per node / client (instead of new executor per streamer).
---
.../ignite/internal/client/ReliableChannel.java | 44 ++++++++++++++++++----
.../internal/client/table/ClientDataStreamer.java | 7 +++-
.../ignite/client/fakes/FakeInternalTable.java | 6 +++
.../internal/streamer/StreamerSubscriber.java | 43 +++++++++++----------
.../apache/ignite/internal/util/IgniteUtils.java | 6 ++-
.../internal/streamer/StreamerSubscriberTest.java | 22 +++++++++++
.../exec/rel/TableScanNodeExecutionTest.java | 3 +-
.../ignite/internal/table/ItColocationTest.java | 3 +-
.../apache/ignite/internal/table/DataStreamer.java | 11 +++++-
.../ignite/internal/table/InternalTable.java | 8 ++++
.../internal/table/KeyValueBinaryViewImpl.java | 2 +-
.../ignite/internal/table/KeyValueViewImpl.java | 2 +-
.../internal/table/RecordBinaryViewImpl.java | 2 +-
.../ignite/internal/table/RecordViewImpl.java | 4 +-
.../internal/table/distributed/TableManager.java | 40 +++++++++++++++++---
.../distributed/storage/InternalTableImpl.java | 13 ++++++-
.../distributed/storage/InternalTableImplTest.java | 6 ++-
.../apache/ignite/distributed/ItTxTestCluster.java | 3 +-
.../table/impl/DummyInternalTableImpl.java | 3 +-
19 files changed, 179 insertions(+), 49 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index b4343351bd..26336a37f9 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static
org.apache.ignite.lang.ErrorGroups.Client.CLUSTER_ID_MISMATCH_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.CONFIGURATION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
@@ -37,7 +38,9 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -55,6 +58,8 @@ import org.apache.ignite.client.RetryPolicyContext;
import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
import
org.apache.ignite.internal.client.io.netty.NettyClientConnectionMultiplexer;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -113,6 +118,10 @@ public final class ReliableChannel implements
AutoCloseable {
/** Cluster id from the first handshake. */
private final AtomicReference<UUID> clusterId = new AtomicReference<>();
+ /** Scheduled executor for streamer flush. */
+ @Nullable
+ private ScheduledExecutorService streamerFlushExecutor;
+
/**
* Constructor.
*
@@ -134,18 +143,21 @@ public final class ReliableChannel implements
AutoCloseable {
/** {@inheritDoc} */
@Override
- public synchronized void close() {
+ public synchronized void close() throws Exception {
closed = true;
List<ClientChannelHolder> holders = channels;
- if (holders != null) {
- for (ClientChannelHolder hld : holders) {
- hld.close();
- }
- }
-
- connMgr.stop();
+ IgniteUtils.closeAllManually(
+ () -> {
+ if (holders != null) {
+ for (ClientChannelHolder hld : holders) {
+ hld.close();
+ }
+ }
+ },
+ connMgr::stop,
+ () -> shutdownAndAwaitTermination(streamerFlushExecutor, 10,
TimeUnit.SECONDS));
}
/**
@@ -675,6 +687,22 @@ public final class ReliableChannel implements
AutoCloseable {
return partitionAssignmentTimestamp.get();
}
+ /**
+ * Gets the data streamer flush scheduled executor.
+ *
+ * @return Streamer flush executor.
+ */
+ public synchronized ScheduledExecutorService streamerFlushExecutor() {
+ if (streamerFlushExecutor == null) {
+ streamerFlushExecutor = Executors.newSingleThreadScheduledExecutor(
+ new NamedThreadFactory(
+ "client-data-streamer-flush-" + hashCode(),
+ ClientUtils.logger(clientCfg,
ReliableChannel.class)));
+ }
+
+ return streamerFlushExecutor;
+ }
+
@Nullable
private static IgniteClientConnectionException
unwrapConnectionException(Throwable err) {
while (err instanceof CompletionException) {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
index 3b436b7d2b..d846a029df 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
@@ -42,7 +42,12 @@ class ClientDataStreamer {
IgniteLogger log = ClientUtils.logger(tbl.channel().configuration(),
StreamerSubscriber.class);
StreamerOptions streamerOpts = streamerOptions(options);
StreamerSubscriber<R, Integer> subscriber = new StreamerSubscriber<>(
- batchSender, partitionAwarenessProvider, streamerOpts, log,
tbl.channel().metrics());
+ batchSender,
+ partitionAwarenessProvider,
+ streamerOpts,
+ tbl.channel().streamerFlushExecutor(),
+ log,
+ tbl.channel().metrics());
publisher.subscribe(subscriber);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 2ca1275aae..8016953e97 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -32,6 +32,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -492,4 +493,9 @@ public class FakeInternalTable implements InternalTable {
public @Nullable PendingComparableValuesTracker<Long, Void>
getPartitionStorageIndexTracker(int partitionId) {
return null;
}
+
+ @Override
+ public ScheduledExecutorService streamerFlushExecutor() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index 6a545cd07e..6072f27f4d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
@@ -33,8 +32,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.table.DataStreamerItem;
import org.jetbrains.annotations.Nullable;
@@ -67,12 +64,14 @@ public class StreamerSubscriber<T, P> implements
Subscriber<DataStreamerItem<T>>
private final StreamerMetricSink metrics;
- private @Nullable Flow.Subscription subscription;
+ private final ScheduledExecutorService flushExecutor;
- private @Nullable ScheduledExecutorService flushTimer;
+ private @Nullable Flow.Subscription subscription;
private @Nullable ScheduledFuture<?> flushTask;
+ private boolean closed;
+
/**
* Constructor.
*
@@ -83,23 +82,26 @@ public class StreamerSubscriber<T, P> implements
Subscriber<DataStreamerItem<T>>
StreamerBatchSender<T, P> batchSender,
StreamerPartitionAwarenessProvider<T, P>
partitionAwarenessProvider,
StreamerOptions options,
+ ScheduledExecutorService flushExecutor,
IgniteLogger log,
@Nullable StreamerMetricSink metrics) {
assert batchSender != null;
assert partitionAwarenessProvider != null;
assert options != null;
+ assert flushExecutor != null;
assert log != null;
this.batchSender = batchSender;
this.partitionAwarenessProvider = partitionAwarenessProvider;
this.options = options;
+ this.flushExecutor = flushExecutor;
this.log = log;
this.metrics = getMetrics(metrics);
}
/** {@inheritDoc} */
@Override
- public void onSubscribe(Subscription subscription) {
+ public synchronized void onSubscribe(Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException("Subscription is already set.");
}
@@ -208,13 +210,9 @@ public class StreamerSubscriber<T, P> implements
Subscriber<DataStreamerItem<T>>
}
}
- private void close(@Nullable Throwable throwable) {
- if (flushTimer != null) {
- assert flushTask != null;
-
+ private synchronized void close(@Nullable Throwable throwable) {
+ if (flushTask != null) {
flushTask.cancel(false);
-
- IgniteUtils.shutdownAndAwaitTermination(flushTimer, 10,
TimeUnit.SECONDS);
}
var s = subscription;
@@ -232,9 +230,15 @@ public class StreamerSubscriber<T, P> implements
Subscriber<DataStreamerItem<T>>
} else {
completionFut.completeExceptionally(throwable);
}
+
+ closed = true;
}
- private void requestMore() {
+ private synchronized void requestMore() {
+ if (closed || subscription == null) {
+ return;
+ }
+
// This method controls backpressure. We won't get more items than we
requested.
// The idea is to have perPartitionParallelOperations batches in
flight for every connection.
var pending = pendingItemCount.get();
@@ -246,23 +250,22 @@ public class StreamerSubscriber<T, P> implements
Subscriber<DataStreamerItem<T>>
return;
}
- assert subscription != null;
subscription.request(count);
pendingItemCount.addAndGet(count);
}
- private void initFlushTimer() {
+ private synchronized void initFlushTimer() {
+ if (closed) {
+ return;
+ }
+
int interval = options.autoFlushFrequency();
if (interval <= 0) {
return;
}
- String threadPrefix = "client-data-streamer-flush-" + hashCode();
-
- flushTimer = Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory(threadPrefix, log));
-
- flushTask = flushTimer.scheduleAtFixedRate(this::flushBuffers,
interval, interval, TimeUnit.MILLISECONDS);
+ flushTask = flushExecutor.scheduleAtFixedRate(this::flushBuffers,
interval, interval, TimeUnit.MILLISECONDS);
}
private void flushBuffers() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 1f9bf4e85f..439459111a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -522,7 +522,11 @@ public class IgniteUtils {
* @param timeout the maximum time to wait for the {@code ExecutorService}
to terminate
* @param unit the time unit of the timeout argument
*/
- public static void shutdownAndAwaitTermination(ExecutorService service,
long timeout, TimeUnit unit) {
+ public static void shutdownAndAwaitTermination(@Nullable ExecutorService
service, long timeout, TimeUnit unit) {
+ if (service == null) {
+ return;
+ }
+
long halfTimeoutNanos = unit.toNanos(timeout) / 2;
// Disable new tasks from being submitted
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
index cfd6cecf1a..4fdd9bf62c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
@@ -23,16 +23,37 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongFunction;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.table.DataStreamerItem;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class StreamerSubscriberTest extends BaseIgniteAbstractTest {
+ private static ScheduledExecutorService flushExecutor;
+
+ @BeforeAll
+ public static void flushExecutorInit() {
+ flushExecutor = Executors.newSingleThreadScheduledExecutor(
+ new NamedThreadFactory("flushExecutor",
Loggers.forClass(StreamerSubscriberTest.class)));
+ }
+
+ @AfterAll
+ public static void flushExecutorShutdown() {
+ if (flushExecutor != null) {
+ flushExecutor.shutdown();
+ }
+ }
+
private static class Metrics implements StreamerMetricSink {
private final LongAdder batchesSent = new LongAdder();
private final LongAdder itemsSent = new LongAdder();
@@ -157,6 +178,7 @@ class StreamerSubscriberTest extends BaseIgniteAbstractTest
{
(part, batch, deleted) -> sendFuture,
partitionProvider,
options,
+ flushExecutor,
log,
metrics
);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index ab0a3d5f48..9afaa4e1fd 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -250,7 +250,8 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
),
mock(TransactionInflights.class),
3_000,
- 0
+ 0,
+ null
);
this.dataAmount = dataAmount;
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 767846b6b8..7574e24f7e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -298,7 +298,8 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
new TableRaftServiceImpl("PUBLIC.TEST", PARTS, partRafts, new
SingleClusterNodeResolver(clusterNode)),
transactionInflights,
3_000,
- 0
+ 0,
+ null
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
index d7ba837ca8..fb6a6a73e4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
@@ -36,10 +37,16 @@ class DataStreamer {
Publisher<DataStreamerItem<R>> publisher,
@Nullable DataStreamerOptions options,
StreamerBatchSender<R, Integer> batchSender,
- StreamerPartitionAwarenessProvider<R, Integer>
partitionAwarenessProvider) {
+ StreamerPartitionAwarenessProvider<R, Integer>
partitionAwarenessProvider,
+ ScheduledExecutorService flushExecutor) {
StreamerOptions streamerOpts = streamerOptions(options);
StreamerSubscriber<R, Integer> subscriber = new StreamerSubscriber<>(
- batchSender, partitionAwarenessProvider, streamerOpts, LOG,
null);
+ batchSender,
+ partitionAwarenessProvider,
+ streamerOpts,
+ flushExecutor,
+ LOG,
+ null);
publisher.subscribe(subscriber);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index b1a388623a..a59ebba8fc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -478,4 +479,11 @@ public interface InternalTable extends ManuallyCloseable {
* @param partitionId Partition ID.
*/
@Nullable PendingComparableValuesTracker<Long, Void>
getPartitionStorageIndexTracker(int partitionId);
+
+ /**
+ * Gets the streamer flush executor service.
+ *
+ * @return Streamer flush executor.
+ */
+ ScheduledExecutorService streamerFlushExecutor();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index 332a13671c..8c52d01f92 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -567,7 +567,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
schemaVersion ->
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted,
partitionId)
));
- CompletableFuture<Void> future = DataStreamer.streamData(publisher,
options, batchSender, partitioner);
+ CompletableFuture<Void> future = DataStreamer.streamData(publisher,
options, batchSender, partitioner, tbl.streamerFlushExecutor());
return convertToPublicFuture(future);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 238cb8e320..78b719d879 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -706,7 +706,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
schemaVersion ->
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted,
partitionId)
));
- CompletableFuture<Void> future = DataStreamer.streamData(publisher,
options, batchSender, partitioner);
+ CompletableFuture<Void> future = DataStreamer.streamData(publisher,
options, batchSender, partitioner, tbl.streamerFlushExecutor());
return convertToPublicFuture(future);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index b2d9ccc5cf..94903d0d2b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -492,7 +492,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
schemaVersion -> this.tbl.updateAll(mapToBinary(rows,
schemaVersion, deleted), deleted, partitionId)
));
- CompletableFuture<Void> future = DataStreamer.streamData(publisher,
options, batchSender, partitioner);
+ CompletableFuture<Void> future = DataStreamer.streamData(publisher,
options, batchSender, partitioner, tbl.streamerFlushExecutor());
return convertToPublicFuture(future);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 85d7da5d8f..711d12c8ae 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -582,7 +582,9 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
schemaVersion -> this.tbl.updateAll(marshal(items,
schemaVersion, deleted), deleted, partitionId)
));
- CompletableFuture<Void> future = DataStreamer.streamData(publisher,
options, batchSender, partitioner);
+ CompletableFuture<Void> future = DataStreamer.streamData(
+ publisher, options, batchSender, partitioner,
tbl.streamerFlushExecutor());
+
return convertToPublicFuture(future);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 109adb66dc..dcc5088bd0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -407,9 +407,15 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final TransactionConfiguration txCfg;
+ private final String nodeName;
+
private long implicitTransactionTimeout;
+
private int attemptsObtainLock;
+ @Nullable
+ private ScheduledExecutorService streamerFlushExecutor;
+
/**
* Creates a new table manager.
*
@@ -500,6 +506,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
this.lowWatermark = lowWatermark;
this.transactionInflights = transactionInflights;
this.txCfg = txCfg;
+ this.nodeName = nodeName;
this.executorInclinedSchemaSyncService = new
ExecutorInclinedSchemaSyncService(schemaSyncService,
partitionOperationsExecutor);
this.executorInclinedPlacementDriver = new
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -1157,15 +1164,18 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return;
}
+ int shutdownTimeoutSeconds = 10;
+
IgniteUtils.closeAllManually(
mvGc,
fullStateTransferIndexChooser,
sharedTxStateStorage,
- () -> shutdownAndAwaitTermination(rebalanceScheduler, 10,
TimeUnit.SECONDS),
- () -> shutdownAndAwaitTermination(txStateStoragePool, 10,
TimeUnit.SECONDS),
- () -> shutdownAndAwaitTermination(txStateStorageScheduledPool,
10, TimeUnit.SECONDS),
- () -> shutdownAndAwaitTermination(scanRequestExecutor, 10,
TimeUnit.SECONDS),
- () -> shutdownAndAwaitTermination(incomingSnapshotsExecutor,
10, TimeUnit.SECONDS)
+ () -> shutdownAndAwaitTermination(rebalanceScheduler,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
+ () -> shutdownAndAwaitTermination(txStateStoragePool,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
+ () -> shutdownAndAwaitTermination(txStateStorageScheduledPool,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
+ () -> shutdownAndAwaitTermination(scanRequestExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
+ () -> shutdownAndAwaitTermination(incomingSnapshotsExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
+ () -> shutdownAndAwaitTermination(streamerFlushExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS)
);
}
@@ -1320,7 +1330,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
tableRaftService,
transactionInflights,
implicitTransactionTimeout,
- attemptsObtainLock
+ attemptsObtainLock,
+ this::streamerFlushExecutor
);
var table = new TableImpl(
@@ -2543,4 +2554,21 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
engine.dropMvTable(tableDescriptor.id());
}
+
+ private synchronized ScheduledExecutorService streamerFlushExecutor() {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
+
+ try {
+ if (streamerFlushExecutor == null) {
+ streamerFlushExecutor =
Executors.newSingleThreadScheduledExecutor(
+ IgniteThreadFactory.create(nodeName,
"streamer-flush-executor", LOG, STORAGE_WRITE));
+ }
+
+ return streamerFlushExecutor;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index cde6397855..fddd5a6d69 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -55,12 +55,14 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -131,6 +133,8 @@ public class InternalTableImpl implements InternalTable {
/** Partitions. */
private final int partitions;
+ private final Supplier<ScheduledExecutorService> streamerFlushExecutor;
+
/** Table name. */
private volatile String tableName;
@@ -217,7 +221,8 @@ public class InternalTableImpl implements InternalTable {
TableRaftServiceImpl tableRaftService,
TransactionInflights transactionInflights,
long implicitTransactionTimeout,
- int attemptsObtainLock
+ int attemptsObtainLock,
+ Supplier<ScheduledExecutorService> streamerFlushExecutor
) {
this.tableName = tableName;
this.tableId = tableId;
@@ -235,6 +240,7 @@ public class InternalTableImpl implements InternalTable {
this.transactionInflights = transactionInflights;
this.implicitTransactionTimeout = implicitTransactionTimeout;
this.attemptsObtainLock = attemptsObtainLock;
+ this.streamerFlushExecutor = streamerFlushExecutor;
}
/** {@inheritDoc} */
@@ -2162,6 +2168,11 @@ public class InternalTableImpl implements InternalTable {
return storageIndexTrackerByPartitionId.get(partitionId);
}
+ @Override
+ public ScheduledExecutorService streamerFlushExecutor() {
+ return streamerFlushExecutor.get();
+ }
+
/**
* Updates the partition trackers, if there were previous ones, it closes
them.
*
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 0919a7c71a..b07518f94d 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -74,7 +74,8 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
new TableRaftServiceImpl("test", 1, Int2ObjectMaps.emptyMap(),
new SingleClusterNodeResolver(mock(ClusterNode.class))),
mock(TransactionInflights.class),
3_000,
- 0
+ 0,
+ null
);
// Let's check the empty table.
@@ -123,7 +124,8 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
new TableRaftServiceImpl("test", 3, Int2ObjectMaps.emptyMap(),
new SingleClusterNodeResolver(mock(ClusterNode.class))),
mock(TransactionInflights.class),
3_000,
- 0
+ 0,
+ null
);
List<BinaryRowEx> originalRows = List.of(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index a3ab14808e..edb40c765b 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -748,7 +748,8 @@ public class ItTxTestCluster {
new TableRaftServiceImpl(tableName, 1, clients,
nodeResolver),
clientTransactionInflights,
500,
- 0
+ 0,
+ null
),
new DummySchemaManagerImpl(schemaDescriptor),
clientTxManager.lockManager(),
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 2cce50cc5a..6a0f189414 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -256,7 +256,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
),
transactionInflights,
3_000,
- 0
+ 0,
+ null
);
RaftGroupService svc =
tableRaftService().partitionRaftGroupService(PART_ID);