This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e6d97f5b25 IGNITE-20291 Fix in-flight counts for backpressure (#2509)
e6d97f5b25 is described below

commit e6d97f5b254d0569972fe03c21c15ad0c8e8a858
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Aug 29 12:03:23 2023 +0300

    IGNITE-20291 Fix in-flight counts for backpressure (#2509)
    
    * Fix an error when computing the number of in-flight batches
    * Remove StreamerOptions#retryLimit
    * Remove obsolete timer checks in StreamerBuffer
---
 .../internal/client/table/ClientDataStreamer.java  |   5 -
 .../ignite/internal/streamer/StreamerBuffer.java   |  38 ++---
 .../ignite/internal/streamer/StreamerOptions.java  |   8 -
 .../internal/streamer/StreamerSubscriber.java      |  53 +++---
 .../internal/streamer/StreamerSubscriberTest.java  | 179 +++++++++++++++++++++
 .../apache/ignite/internal/table/DataStreamer.java |   5 -
 6 files changed, 211 insertions(+), 77 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
index 842b564c94..93dc2057c4 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
@@ -64,11 +64,6 @@ class ClientDataStreamer {
             public int autoFlushFrequency() {
                 return options.autoFlushFrequency();
             }
-
-            @Override
-            public int retryLimit() {
-                return options.retryLimit();
-            }
         };
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
index 15006f21e2..4289caf63b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
@@ -19,24 +19,19 @@ package org.apache.ignite.internal.streamer;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
+import java.util.function.Consumer;
 
 class StreamerBuffer<T> {
     private final int capacity;
 
-    private final Function<List<T>, CompletableFuture<Void>> flusher;
+    private final Consumer<List<T>> flusher;
 
     /** Primary buffer. Won't grow over capacity. */
     private List<T> buf;
 
-    private CompletableFuture<Void> flushFut;
-
     private boolean closed;
 
-    private long lastFlushTime;
-
-    StreamerBuffer(int capacity, Function<List<T>, CompletableFuture<Void>> 
flusher) {
+    StreamerBuffer(int capacity, Consumer<List<T>> flusher) {
         this.capacity = capacity;
         this.flusher = flusher;
         buf = new ArrayList<>(capacity);
@@ -55,12 +50,12 @@ class StreamerBuffer<T> {
         buf.add(item);
 
         if (buf.size() >= capacity) {
-            flush(buf);
+            flusher.accept(buf);
             buf = new ArrayList<>(capacity);
         }
     }
 
-    synchronized CompletableFuture<Void> flushAndClose() {
+    synchronized void flushAndClose() {
         if (closed) {
             throw new IllegalStateException("Streamer is already closed.");
         }
@@ -68,31 +63,16 @@ class StreamerBuffer<T> {
         closed = true;
 
         if (!buf.isEmpty()) {
-            flush(buf);
+            flusher.accept(buf);
         }
-
-        return flushFut == null ? CompletableFuture.completedFuture(null) : 
flushFut;
     }
 
-    synchronized void flush(long period) {
+    synchronized void flush() {
         if (closed || buf.isEmpty()) {
             return;
         }
 
-        if (System.nanoTime() - lastFlushTime > period) {
-            flush(buf);
-            buf = new ArrayList<>(capacity);
-        }
-    }
-
-    private void flush(List<T> b) {
-        if (flushFut == null || flushFut.isDone()) {
-            flushFut = flusher.apply(b);
-        } else {
-            // Chain flush futures to ensure the order of items.
-            flushFut = flushFut.thenCompose(ignored -> flusher.apply(b));
-        }
-
-        lastFlushTime = System.nanoTime();
+        flusher.accept(buf);
+        buf = new ArrayList<>(capacity);
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
index 9c99f82bd5..e9874bded3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
@@ -42,12 +42,4 @@ public interface StreamerOptions {
      * @return Auto flush frequency.
      */
     int autoFlushFrequency();
-
-    /**
-     * Gets the retry limit for a batch. If a batch fails to be sent to the 
cluster, the streamer will retry it a number of times.
-     * If all retries fail, the streamer will be aborted.
-     *
-     * @return Retry limit.
-     */
-    int retryLimit();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index c6c2369ec6..b256915671 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.internal.streamer;
 
 import java.util.Collection;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
@@ -53,12 +53,12 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<T> {
 
     private final AtomicInteger inFlightItemCount = new AtomicInteger();
 
-    private final Set<CompletableFuture<Void>> pendingFuts = 
ConcurrentHashMap.newKeySet();
-
     // NOTE: This can accumulate empty buffers for stopped/failed nodes. 
Cleaning up is not trivial in concurrent scenario.
     // We don't expect thousands of node failures, so it should be fine.
     private final ConcurrentHashMap<P, StreamerBuffer<T>> buffers = new 
ConcurrentHashMap<>();
 
+    private final ConcurrentMap<P, CompletableFuture<Void>> pendingRequests = 
new ConcurrentHashMap<>();
+
     private final IgniteLogger log;
 
     private final StreamerMetricSink metrics;
@@ -125,7 +125,7 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<T> {
 
         StreamerBuffer<T> buf = buffers.computeIfAbsent(
                 partition,
-                p -> new StreamerBuffer<>(options.batchSize(), items -> 
sendBatch(p, items)));
+                p -> new StreamerBuffer<>(options.batchSize(), items -> 
enlistBatch(p, items)));
 
         buf.add(item);
         this.metrics.streamerItemsQueuedAdd(1);
@@ -154,32 +154,37 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<T> {
         return completionFut;
     }
 
-    private CompletableFuture<Void> sendBatch(P partition, Collection<T> 
batch) {
+    private void enlistBatch(P partition, Collection<T> batch) {
         int batchSize = batch.size();
         assert batchSize > 0 : "Batch size must be positive.";
 
-        CompletableFuture<Void> fut = new CompletableFuture<>();
-        pendingFuts.add(fut);
         inFlightItemCount.addAndGet(batchSize);
         metrics.streamerBatchesActiveAdd(1);
 
+        pendingRequests.compute(
+                partition,
+                // Chain existing futures to preserve request order.
+                (part, fut) -> fut == null ? sendBatch(part, batch) : 
fut.thenCompose(v -> sendBatch(part, batch))
+        );
+    }
+
+    private CompletableFuture<Void> sendBatch(P partition, Collection<T> 
batch) {
         // If a connection fails, the batch goes to default connection thanks 
to built-it retry mechanism.
         try {
-            batchSender.sendAsync(partition, batch).whenComplete((res, err) -> 
{
+            return batchSender.sendAsync(partition, batch).whenComplete((res, 
err) -> {
                 if (err != null) {
                     // Retry is handled by the sender (RetryPolicy in 
ReliableChannel on the client, sendWithRetry on the server).
                     // If we get here, then retries are exhausted and we 
should fail the streamer.
                     log.error("Failed to send batch to partition " + partition 
+ ": " + err.getMessage(), err);
                     close(err);
                 } else {
+                    int batchSize = batch.size();
+
                     this.metrics.streamerBatchesSentAdd(1);
                     this.metrics.streamerBatchesActiveAdd(-1);
                     this.metrics.streamerItemsSentAdd(batchSize);
                     this.metrics.streamerItemsQueuedAdd(-batchSize);
 
-                    fut.complete(null);
-                    pendingFuts.remove(fut);
-
                     inFlightItemCount.addAndGet(-batchSize);
                     requestMore();
 
@@ -191,8 +196,6 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<T> {
                     });
                 }
             });
-
-            return fut;
         } catch (Exception e) {
             log.error("Failed to send batch to partition " + partition + ": " 
+ e.getMessage(), e);
             close(e);
@@ -216,11 +219,9 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<T> {
         }
 
         if (throwable == null) {
-            for (StreamerBuffer<T> buf : buffers.values()) {
-                pendingFuts.add(buf.flushAndClose());
-            }
+            buffers.values().forEach(StreamerBuffer::flushAndClose);
 
-            var futs = pendingFuts.toArray(new CompletableFuture[0]);
+            var futs = pendingRequests.values().toArray(new 
CompletableFuture[0]);
 
             CompletableFuture.allOf(futs).whenComplete((res, err) -> {
                 if (err != null) {
@@ -262,7 +263,11 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<T> {
 
         flushTimer = Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory(threadPrefix, log));
 
-        flushTask = flushTimer.scheduleAtFixedRate(new PeriodicFlushTask(), 
interval, interval, TimeUnit.MILLISECONDS);
+        flushTask = flushTimer.scheduleAtFixedRate(this::flushBuffers, 
interval, interval, TimeUnit.MILLISECONDS);
+    }
+
+    private void flushBuffers() {
+        buffers.values().forEach(StreamerBuffer::flush);
     }
 
     private static StreamerMetricSink getMetrics(@Nullable StreamerMetricSink 
metrics) {
@@ -290,16 +295,4 @@ public class StreamerSubscriber<T, P> implements 
Subscriber<T> {
             }
         };
     }
-
-    /**
-     * Periodically flushes buffers.
-     */
-    private class PeriodicFlushTask implements Runnable {
-        @Override
-        public void run() {
-            for (StreamerBuffer<T> buf : buffers.values()) {
-                buf.flush(options.autoFlushFrequency());
-            }
-        }
-    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
new file mode 100644
index 0000000000..71a964d740
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.streamer;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.LongFunction;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+class StreamerSubscriberTest extends BaseIgniteAbstractTest {
+    private static class Metrics implements StreamerMetricSink {
+        private final LongAdder batchesSent = new LongAdder();
+        private final LongAdder itemsSent = new LongAdder();
+        private final LongAdder batchesActive = new LongAdder();
+        private final LongAdder itemsQueued = new LongAdder();
+
+        @Override
+        public void streamerBatchesSentAdd(long batches) {
+            batchesSent.add(batches);
+        }
+
+        @Override
+        public void streamerItemsSentAdd(long items) {
+            itemsSent.add(items);
+        }
+
+        @Override
+        public void streamerBatchesActiveAdd(long batches) {
+            batchesActive.add(batches);
+        }
+
+        @Override
+        public void streamerItemsQueuedAdd(long items) {
+            itemsQueued.add(items);
+        }
+    }
+
+    private static class Options implements StreamerOptions {
+        private final int batchSize;
+        private final int perNodeParallelOperations;
+        private final int autoFlushFrequency;
+
+        Options(int batchSize, int perNodeParallelOperations, int 
autoFlushFrequency) {
+            this.batchSize = batchSize;
+            this.perNodeParallelOperations = perNodeParallelOperations;
+            this.autoFlushFrequency = autoFlushFrequency;
+        }
+
+        @Override
+        public int batchSize() {
+            return batchSize;
+        }
+
+        @Override
+        public int perNodeParallelOperations() {
+            return perNodeParallelOperations;
+        }
+
+        @Override
+        public int autoFlushFrequency() {
+            return autoFlushFrequency;
+        }
+    }
+
+    /**
+     * Publisher that generates a {@code limit} number of items and dispatches 
them in the same thread.
+     */
+    private static class LimitedPublisher<T> implements Publisher<T> {
+        private final long limit;
+
+        private final LongFunction<T> generator;
+
+        LimitedPublisher(long limit, LongFunction<T> generator) {
+            this.limit = limit;
+            this.generator = generator;
+        }
+
+        @Override
+        public void subscribe(Subscriber<? super T> subscriber) {
+            subscriber.onSubscribe(new Subscription() {
+                private long produced = 0;
+
+                @Override
+                public void request(long n) {
+                    if (produced == limit) {
+                        produced++;
+                        subscriber.onComplete();
+                    } else if (produced < limit) {
+                        for (int i = 0; i < Math.min(n, limit - produced); 
i++) {
+                            produced++;
+                            subscriber.onNext(generator.apply(produced));
+                        }
+                    }
+                }
+
+                @Override
+                public void cancel() {
+                }
+            });
+        }
+    }
+
+    /**
+     * Tests the backpressure algorithm when batch sending is stuck.
+     */
+    @Test
+    void testBackpressureWithDelay() {
+        long itemsCount = 10;
+
+        var metrics = new Metrics();
+
+        var options = new Options(2, 1, 1000);
+
+        long expectedBatches = itemsCount / options.batchSize;
+
+        var partitionProvider = new StreamerPartitionAwarenessProvider<Long, 
String>() {
+            @Override
+            public String partition(Long item) {
+                return "foo";
+            }
+
+            @Override
+            public CompletableFuture<Void> refreshAsync() {
+                return CompletableFuture.completedFuture(null);
+            }
+        };
+
+        var sendFuture = new CompletableFuture<Void>();
+
+        var subscriber = new StreamerSubscriber<>(
+                (part, batch) -> sendFuture,
+                partitionProvider,
+                options,
+                log,
+                metrics
+        );
+
+        var publisher = new LimitedPublisher<>(itemsCount, i -> i);
+
+        publisher.subscribe(subscriber);
+
+        assertThat(metrics.batchesActive.longValue(), is(expectedBatches));
+        assertThat(metrics.batchesSent.longValue(), is(0L));
+        assertThat(metrics.itemsQueued.longValue(), is(itemsCount));
+        assertThat(metrics.itemsSent.longValue(), is(0L));
+
+        sendFuture.complete(null);
+
+        assertThat(subscriber.completionFuture(), willCompleteSuccessfully());
+
+        assertThat(metrics.batchesActive.longValue(), is(0L));
+        assertThat(metrics.batchesSent.longValue(), is(expectedBatches));
+        assertThat(metrics.itemsQueued.longValue(), is(0L));
+        assertThat(metrics.itemsSent.longValue(), is(itemsCount));
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
index 62a40f9a87..a82bfa6edd 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
@@ -63,11 +63,6 @@ class DataStreamer {
             public int autoFlushFrequency() {
                 return options0.autoFlushFrequency();
             }
-
-            @Override
-            public int retryLimit() {
-                return options0.retryLimit();
-            }
         };
     }
 }

Reply via email to