This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 27aeb5788d IGNITE-21748 Rename 
DataStreamerOptions.perNodeParallelOperations to perPartitionParallelOperations 
(#3420)
27aeb5788d is described below

commit 27aeb5788dcd18dbdfa53571c37f22bfede2a689
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Mar 15 18:40:01 2024 +0200

    IGNITE-21748 Rename DataStreamerOptions.perNodeParallelOperations to 
perPartitionParallelOperations (#3420)
    
    * Data streamer was changed to perform per-partition batching - rename the 
options accordingly.
    * Change the default from 4 to 1: there are multiple partitions per node, 
we don't want too many active requests.
---
 .../apache/ignite/table/DataStreamerOptions.java   | 31 +++++++++++-----------
 .../internal/client/table/ClientDataStreamer.java  |  4 +--
 .../org/apache/ignite/client/DataStreamerTest.java |  8 ++++--
 .../ignite/client/PartitionAwarenessTest.java      |  2 +-
 .../ignite/internal/streamer/StreamerOptions.java  |  4 +--
 .../internal/streamer/StreamerSubscriber.java      |  4 +--
 .../internal/streamer/StreamerSubscriberTest.java  | 10 +++----
 .../apache/ignite/internal/table/DataStreamer.java |  4 +--
 8 files changed, 35 insertions(+), 32 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerOptions.java 
b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerOptions.java
index 9d0b64a1ff..0ae7be0d01 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerOptions.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerOptions.java
@@ -26,7 +26,7 @@ public class DataStreamerOptions {
 
     private final int pageSize;
 
-    private final int perNodeParallelOperations;
+    private final int perPartitionParallelOperations;
 
     private final int autoFlushFrequency;
 
@@ -36,13 +36,13 @@ public class DataStreamerOptions {
      * Constructor.
      *
      * @param pageSize Page size.
-     * @param perNodeParallelOperations Per node parallel operations.
+     * @param perPartitionParallelOperations Per partition parallel operations.
      * @param autoFlushFrequency Auto flush frequency.
      * @param retryLimit Retry limit.
      */
-    private DataStreamerOptions(int pageSize, int perNodeParallelOperations, 
int autoFlushFrequency, int retryLimit) {
+    private DataStreamerOptions(int pageSize, int 
perPartitionParallelOperations, int autoFlushFrequency, int retryLimit) {
         this.pageSize = pageSize;
-        this.perNodeParallelOperations = perNodeParallelOperations;
+        this.perPartitionParallelOperations = perPartitionParallelOperations;
         this.autoFlushFrequency = autoFlushFrequency;
         this.retryLimit = retryLimit;
     }
@@ -66,12 +66,12 @@ public class DataStreamerOptions {
     }
 
     /**
-     * Gets the number of parallel operations per node (how many in-flight 
requests can be active for a given node).
+     * Gets the number of parallel operations per partition (how many 
in-flight requests can be active for a given partition).
      *
      * @return Per node parallel operations.
      */
-    public int perNodeParallelOperations() {
-        return perNodeParallelOperations;
+    public int perPartitionParallelOperations() {
+        return perPartitionParallelOperations;
     }
 
     /**
@@ -100,7 +100,7 @@ public class DataStreamerOptions {
     public static class Builder {
         private int pageSize = 1000;
 
-        private int perNodeParallelOperations = 4;
+        private int perPartitionParallelOperations = 1;
 
         private int autoFlushFrequency = 5000;
 
@@ -123,18 +123,17 @@ public class DataStreamerOptions {
         }
 
         /**
-         * Sets the number of parallel operations per node (how many in-flight 
requests can be active for a given node).
+         * Sets the number of parallel operations per partition (how many 
in-flight requests can be active for a given partition).
          *
-         * @param perNodeParallelOperations Per node parallel operations.
+         * @param perPartitionParallelOperations Per partition parallel 
operations.
          * @return This builder instance.
          */
-        public Builder perNodeParallelOperations(int 
perNodeParallelOperations) {
-            // TODO IGNITE-21283: Rename to perPartitionParallelOperations?
-            if (perNodeParallelOperations <= 0) {
-                throw new IllegalArgumentException("Per node parallel 
operations must be positive: " + perNodeParallelOperations);
+        public Builder perPartitionParallelOperations(int 
perPartitionParallelOperations) {
+            if (perPartitionParallelOperations <= 0) {
+                throw new 
IllegalArgumentException("perPartitionParallelOperations must be positive: " + 
perPartitionParallelOperations);
             }
 
-            this.perNodeParallelOperations = perNodeParallelOperations;
+            this.perPartitionParallelOperations = 
perPartitionParallelOperations;
 
             return this;
         }
@@ -171,7 +170,7 @@ public class DataStreamerOptions {
          * @return Data streamer options.
          */
         public DataStreamerOptions build() {
-            return new DataStreamerOptions(pageSize, 
perNodeParallelOperations, autoFlushFrequency, retryLimit);
+            return new DataStreamerOptions(pageSize, 
perPartitionParallelOperations, autoFlushFrequency, retryLimit);
         }
     }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
index 2df4d6a9cc..3b436b7d2b 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
@@ -57,8 +57,8 @@ class ClientDataStreamer {
             }
 
             @Override
-            public int perNodeParallelOperations() {
-                return options.perNodeParallelOperations();
+            public int perPartitionParallelOperations() {
+                return options.perPartitionParallelOperations();
             }
 
             @Override
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
index e21a9130a4..8e82d92307 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/DataStreamerTest.java
@@ -176,7 +176,7 @@ public class DataStreamerTest extends 
AbstractClientTableTest {
         try (var publisher = new 
SubmissionPublisher<DataStreamerItem<Tuple>>(ForkJoinPool.commonPool(), 
bufferSize)) {
             var options = DataStreamerOptions.builder()
                     .pageSize(bufferSize)
-                    .perNodeParallelOperations(1)
+                    .perPartitionParallelOperations(1)
                     .build();
 
             var streamerFut = view.streamData(publisher, options);
@@ -212,7 +212,11 @@ public class DataStreamerTest extends 
AbstractClientTableTest {
         CompletableFuture<Void> streamFut;
 
         try (var publisher = new SimplePublisher<Tuple>()) {
-            var options = DataStreamerOptions.builder().pageSize(2).build();
+            var options = DataStreamerOptions.builder()
+                    .pageSize(2)
+                    .perPartitionParallelOperations(4)
+                    .build();
+
             streamFut = view.streamData(publisher, options);
 
             for (long i = 0; i < 1000; i++) {
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
index 2cdc465125..b92ccd415f 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
@@ -541,7 +541,7 @@ public class PartitionAwarenessTest extends 
AbstractClientTest {
     public void testDataStreamerReceivesPartitionAssignmentUpdates() {
         DataStreamerOptions options = DataStreamerOptions.builder()
                 .pageSize(1)
-                .perNodeParallelOperations(1)
+                .perPartitionParallelOperations(1)
                 .autoFlushFrequency(50)
                 .build();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
index b802effebd..82a1a62d29 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
@@ -29,11 +29,11 @@ public interface StreamerOptions {
     int pageSize();
 
     /**
-     * Gets the number of parallel operations per node (how many in-flight 
requests can be active for a given node).
+     * Gets the number of parallel operations per partition (how many 
in-flight requests can be active for a given partition).
      *
      * @return Per node parallel operations.
      */
-    int perNodeParallelOperations();
+    int perPartitionParallelOperations();
 
     /**
      * Gets the auto flush frequency, in milliseconds
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index d0f36b28e0..6a545cd07e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -236,9 +236,9 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<DataStreamerItem<T>>
 
     private void requestMore() {
         // This method controls backpressure. We won't get more items than we 
requested.
-        // The idea is to have perNodeParallelOperations batches in flight for 
every connection.
+        // The idea is to have perPartitionParallelOperations batches in 
flight for every connection.
         var pending = pendingItemCount.get();
-        var desiredInFlight = Math.max(1, buffers.size()) * options.pageSize() 
* options.perNodeParallelOperations();
+        var desiredInFlight = Math.max(1, buffers.size()) * options.pageSize() 
* options.perPartitionParallelOperations();
         var inFlight = inFlightItemCount.get();
         var count = desiredInFlight - inFlight - pending;
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
index 11d56001c0..cfd6cecf1a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
@@ -62,12 +62,12 @@ class StreamerSubscriberTest extends BaseIgniteAbstractTest 
{
 
     private static class Options implements StreamerOptions {
         private final int batchSize;
-        private final int perNodeParallelOperations;
+        private final int perPartitionParallelOperations;
         private final int autoFlushFrequency;
 
-        Options(int batchSize, int perNodeParallelOperations, int 
autoFlushFrequency) {
+        Options(int batchSize, int perPartitionParallelOperations, int 
autoFlushFrequency) {
             this.batchSize = batchSize;
-            this.perNodeParallelOperations = perNodeParallelOperations;
+            this.perPartitionParallelOperations = 
perPartitionParallelOperations;
             this.autoFlushFrequency = autoFlushFrequency;
         }
 
@@ -77,8 +77,8 @@ class StreamerSubscriberTest extends BaseIgniteAbstractTest {
         }
 
         @Override
-        public int perNodeParallelOperations() {
-            return perNodeParallelOperations;
+        public int perPartitionParallelOperations() {
+            return perPartitionParallelOperations;
         }
 
         @Override
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
index 916768125f..d7ba837ca8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
@@ -56,8 +56,8 @@ class DataStreamer {
             }
 
             @Override
-            public int perNodeParallelOperations() {
-                return options0.perNodeParallelOperations();
+            public int perPartitionParallelOperations() {
+                return options0.perPartitionParallelOperations();
             }
 
             @Override

Reply via email to