This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 27aeb5788d IGNITE-21748 Rename
DataStreamerOptions.perNodeParallelOperations to perPartitionParallelOperations
(#3420)
27aeb5788d is described below
commit 27aeb5788dcd18dbdfa53571c37f22bfede2a689
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Mar 15 18:40:01 2024 +0200
IGNITE-21748 Rename DataStreamerOptions.perNodeParallelOperations to
perPartitionParallelOperations (#3420)
* Data streamer was changed to perform per-partition batching - rename the
options accordingly.
* Change the default from 4 to 1: there are multiple partitions per node,
we don't want too many active requests.
---
.../apache/ignite/table/DataStreamerOptions.java | 31 +++++++++++-----------
.../internal/client/table/ClientDataStreamer.java | 4 +--
.../org/apache/ignite/client/DataStreamerTest.java | 8 ++++--
.../ignite/client/PartitionAwarenessTest.java | 2 +-
.../ignite/internal/streamer/StreamerOptions.java | 4 +--
.../internal/streamer/StreamerSubscriber.java | 4 +--
.../internal/streamer/StreamerSubscriberTest.java | 10 +++----
.../apache/ignite/internal/table/DataStreamer.java | 4 +--
8 files changed, 35 insertions(+), 32 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerOptions.java
b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerOptions.java
index 9d0b64a1ff..0ae7be0d01 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerOptions.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerOptions.java
@@ -26,7 +26,7 @@ public class DataStreamerOptions {
private final int pageSize;
- private final int perNodeParallelOperations;
+ private final int perPartitionParallelOperations;
private final int autoFlushFrequency;
@@ -36,13 +36,13 @@ public class DataStreamerOptions {
* Constructor.
*
* @param pageSize Page size.
- * @param perNodeParallelOperations Per node parallel operations.
+ * @param perPartitionParallelOperations Per partition parallel operations.
* @param autoFlushFrequency Auto flush frequency.
* @param retryLimit Retry limit.
*/
- private DataStreamerOptions(int pageSize, int perNodeParallelOperations,
int autoFlushFrequency, int retryLimit) {
+ private DataStreamerOptions(int pageSize, int
perPartitionParallelOperations, int autoFlushFrequency, int retryLimit) {
this.pageSize = pageSize;
- this.perNodeParallelOperations = perNodeParallelOperations;
+ this.perPartitionParallelOperations = perPartitionParallelOperations;
this.autoFlushFrequency = autoFlushFrequency;
this.retryLimit = retryLimit;
}
@@ -66,12 +66,12 @@ public class DataStreamerOptions {
}
/**
- * Gets the number of parallel operations per node (how many in-flight
requests can be active for a given node).
+ * Gets the number of parallel operations per partition (how many
in-flight requests can be active for a given partition).
*
* @return Per node parallel operations.
*/
- public int perNodeParallelOperations() {
- return perNodeParallelOperations;
+ public int perPartitionParallelOperations() {
+ return perPartitionParallelOperations;
}
/**
@@ -100,7 +100,7 @@ public class DataStreamerOptions {
public static class Builder {
private int pageSize = 1000;
- private int perNodeParallelOperations = 4;
+ private int perPartitionParallelOperations = 1;
private int autoFlushFrequency = 5000;
@@ -123,18 +123,17 @@ public class DataStreamerOptions {
}
/**
- * Sets the number of parallel operations per node (how many in-flight
requests can be active for a given node).
+ * Sets the number of parallel operations per partition (how many
in-flight requests can be active for a given partition).
*
- * @param perNodeParallelOperations Per node parallel operations.
+ * @param perPartitionParallelOperations Per partition parallel
operations.
* @return This builder instance.
*/
- public Builder perNodeParallelOperations(int
perNodeParallelOperations) {
- // TODO IGNITE-21283: Rename to perPartitionParallelOperations?
- if (perNodeParallelOperations <= 0) {
- throw new IllegalArgumentException("Per node parallel
operations must be positive: " + perNodeParallelOperations);
+ public Builder perPartitionParallelOperations(int
perPartitionParallelOperations) {
+ if (perPartitionParallelOperations <= 0) {
+ throw new
IllegalArgumentException("perPartitionParallelOperations must be positive: " +
perPartitionParallelOperations);
}
- this.perNodeParallelOperations = perNodeParallelOperations;
+ this.perPartitionParallelOperations =
perPartitionParallelOperations;
return this;
}
@@ -171,7 +170,7 @@ public class DataStreamerOptions {
* @return Data streamer options.
*/
public DataStreamerOptions build() {
- return new DataStreamerOptions(pageSize,
perNodeParallelOperations, autoFlushFrequency, retryLimit);
+ return new DataStreamerOptions(pageSize,
perPartitionParallelOperations, autoFlushFrequency, retryLimit);
}
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
index 2df4d6a9cc..3b436b7d2b 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
@@ -57,8 +57,8 @@ class ClientDataStreamer {
}
@Override
- public int perNodeParallelOperations() {
- return options.perNodeParallelOperations();
+ public int perPartitionParallelOperations() {
+ return options.perPartitionParallelOperations();
}
@Override
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
b/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
index e21a9130a4..8e82d92307 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
@@ -176,7 +176,7 @@ public class DataStreamerTest extends
AbstractClientTableTest {
try (var publisher = new
SubmissionPublisher<DataStreamerItem<Tuple>>(ForkJoinPool.commonPool(),
bufferSize)) {
var options = DataStreamerOptions.builder()
.pageSize(bufferSize)
- .perNodeParallelOperations(1)
+ .perPartitionParallelOperations(1)
.build();
var streamerFut = view.streamData(publisher, options);
@@ -212,7 +212,11 @@ public class DataStreamerTest extends
AbstractClientTableTest {
CompletableFuture<Void> streamFut;
try (var publisher = new SimplePublisher<Tuple>()) {
- var options = DataStreamerOptions.builder().pageSize(2).build();
+ var options = DataStreamerOptions.builder()
+ .pageSize(2)
+ .perPartitionParallelOperations(4)
+ .build();
+
streamFut = view.streamData(publisher, options);
for (long i = 0; i < 1000; i++) {
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
index 2cdc465125..b92ccd415f 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
@@ -541,7 +541,7 @@ public class PartitionAwarenessTest extends
AbstractClientTest {
public void testDataStreamerReceivesPartitionAssignmentUpdates() {
DataStreamerOptions options = DataStreamerOptions.builder()
.pageSize(1)
- .perNodeParallelOperations(1)
+ .perPartitionParallelOperations(1)
.autoFlushFrequency(50)
.build();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
index b802effebd..82a1a62d29 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
@@ -29,11 +29,11 @@ public interface StreamerOptions {
int pageSize();
/**
- * Gets the number of parallel operations per node (how many in-flight
requests can be active for a given node).
+ * Gets the number of parallel operations per partition (how many
in-flight requests can be active for a given partition).
*
* @return Per node parallel operations.
*/
- int perNodeParallelOperations();
+ int perPartitionParallelOperations();
/**
* Gets the auto flush frequency, in milliseconds
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index d0f36b28e0..6a545cd07e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -236,9 +236,9 @@ public class StreamerSubscriber<T, P> implements
Subscriber<DataStreamerItem<T>>
private void requestMore() {
// This method controls backpressure. We won't get more items than we
requested.
- // The idea is to have perNodeParallelOperations batches in flight for
every connection.
+ // The idea is to have perPartitionParallelOperations batches in
flight for every connection.
var pending = pendingItemCount.get();
- var desiredInFlight = Math.max(1, buffers.size()) * options.pageSize()
* options.perNodeParallelOperations();
+ var desiredInFlight = Math.max(1, buffers.size()) * options.pageSize()
* options.perPartitionParallelOperations();
var inFlight = inFlightItemCount.get();
var count = desiredInFlight - inFlight - pending;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
index 11d56001c0..cfd6cecf1a 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
@@ -62,12 +62,12 @@ class StreamerSubscriberTest extends BaseIgniteAbstractTest
{
private static class Options implements StreamerOptions {
private final int batchSize;
- private final int perNodeParallelOperations;
+ private final int perPartitionParallelOperations;
private final int autoFlushFrequency;
- Options(int batchSize, int perNodeParallelOperations, int
autoFlushFrequency) {
+ Options(int batchSize, int perPartitionParallelOperations, int
autoFlushFrequency) {
this.batchSize = batchSize;
- this.perNodeParallelOperations = perNodeParallelOperations;
+ this.perPartitionParallelOperations =
perPartitionParallelOperations;
this.autoFlushFrequency = autoFlushFrequency;
}
@@ -77,8 +77,8 @@ class StreamerSubscriberTest extends BaseIgniteAbstractTest {
}
@Override
- public int perNodeParallelOperations() {
- return perNodeParallelOperations;
+ public int perPartitionParallelOperations() {
+ return perPartitionParallelOperations;
}
@Override
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
index 916768125f..d7ba837ca8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
@@ -56,8 +56,8 @@ class DataStreamer {
}
@Override
- public int perNodeParallelOperations() {
- return options0.perNodeParallelOperations();
+ public int perPartitionParallelOperations() {
+ return options0.perPartitionParallelOperations();
}
@Override