This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e6d97f5b25 IGNITE-20291 Fix in-flight counts for backpressure (#2509)
e6d97f5b25 is described below
commit e6d97f5b254d0569972fe03c21c15ad0c8e8a858
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Aug 29 12:03:23 2023 +0300
IGNITE-20291 Fix in-flight counts for backpressure (#2509)
* Fix an error when computing the number of in-flight batches
* Remove StreamerOptions#retryLimit
* Remove obsolete timer checks in StreamerBuffer
---
.../internal/client/table/ClientDataStreamer.java | 5 -
.../ignite/internal/streamer/StreamerBuffer.java | 38 ++---
.../ignite/internal/streamer/StreamerOptions.java | 8 -
.../internal/streamer/StreamerSubscriber.java | 53 +++---
.../internal/streamer/StreamerSubscriberTest.java | 179 +++++++++++++++++++++
.../apache/ignite/internal/table/DataStreamer.java | 5 -
6 files changed, 211 insertions(+), 77 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
index 842b564c94..93dc2057c4 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
@@ -64,11 +64,6 @@ class ClientDataStreamer {
public int autoFlushFrequency() {
return options.autoFlushFrequency();
}
-
- @Override
- public int retryLimit() {
- return options.retryLimit();
- }
};
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
index 15006f21e2..4289caf63b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java
@@ -19,24 +19,19 @@ package org.apache.ignite.internal.streamer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
+import java.util.function.Consumer;
class StreamerBuffer<T> {
private final int capacity;
- private final Function<List<T>, CompletableFuture<Void>> flusher;
+ private final Consumer<List<T>> flusher;
/** Primary buffer. Won't grow over capacity. */
private List<T> buf;
- private CompletableFuture<Void> flushFut;
-
private boolean closed;
- private long lastFlushTime;
-
- StreamerBuffer(int capacity, Function<List<T>, CompletableFuture<Void>>
flusher) {
+ StreamerBuffer(int capacity, Consumer<List<T>> flusher) {
this.capacity = capacity;
this.flusher = flusher;
buf = new ArrayList<>(capacity);
@@ -55,12 +50,12 @@ class StreamerBuffer<T> {
buf.add(item);
if (buf.size() >= capacity) {
- flush(buf);
+ flusher.accept(buf);
buf = new ArrayList<>(capacity);
}
}
- synchronized CompletableFuture<Void> flushAndClose() {
+ synchronized void flushAndClose() {
if (closed) {
throw new IllegalStateException("Streamer is already closed.");
}
@@ -68,31 +63,16 @@ class StreamerBuffer<T> {
closed = true;
if (!buf.isEmpty()) {
- flush(buf);
+ flusher.accept(buf);
}
-
- return flushFut == null ? CompletableFuture.completedFuture(null) :
flushFut;
}
- synchronized void flush(long period) {
+ synchronized void flush() {
if (closed || buf.isEmpty()) {
return;
}
- if (System.nanoTime() - lastFlushTime > period) {
- flush(buf);
- buf = new ArrayList<>(capacity);
- }
- }
-
- private void flush(List<T> b) {
- if (flushFut == null || flushFut.isDone()) {
- flushFut = flusher.apply(b);
- } else {
- // Chain flush futures to ensure the order of items.
- flushFut = flushFut.thenCompose(ignored -> flusher.apply(b));
- }
-
- lastFlushTime = System.nanoTime();
+ flusher.accept(buf);
+ buf = new ArrayList<>(capacity);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
index 9c99f82bd5..e9874bded3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerOptions.java
@@ -42,12 +42,4 @@ public interface StreamerOptions {
* @return Auto flush frequency.
*/
int autoFlushFrequency();
-
- /**
- * Gets the retry limit for a batch. If a batch fails to be sent to the
cluster, the streamer will retry it a number of times.
- * If all retries fail, the streamer will be aborted.
- *
- * @return Retry limit.
- */
- int retryLimit();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
index c6c2369ec6..b256915671 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerSubscriber.java
@@ -18,9 +18,9 @@
package org.apache.ignite.internal.streamer;
import java.util.Collection;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
@@ -53,12 +53,12 @@ public class StreamerSubscriber<T, P> implements
Subscriber<T> {
private final AtomicInteger inFlightItemCount = new AtomicInteger();
- private final Set<CompletableFuture<Void>> pendingFuts =
ConcurrentHashMap.newKeySet();
-
// NOTE: This can accumulate empty buffers for stopped/failed nodes.
Cleaning up is not trivial in concurrent scenario.
// We don't expect thousands of node failures, so it should be fine.
private final ConcurrentHashMap<P, StreamerBuffer<T>> buffers = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<P, CompletableFuture<Void>> pendingRequests =
new ConcurrentHashMap<>();
+
private final IgniteLogger log;
private final StreamerMetricSink metrics;
@@ -125,7 +125,7 @@ public class StreamerSubscriber<T, P> implements
Subscriber<T> {
StreamerBuffer<T> buf = buffers.computeIfAbsent(
partition,
- p -> new StreamerBuffer<>(options.batchSize(), items ->
sendBatch(p, items)));
+ p -> new StreamerBuffer<>(options.batchSize(), items ->
enlistBatch(p, items)));
buf.add(item);
this.metrics.streamerItemsQueuedAdd(1);
@@ -154,32 +154,37 @@ public class StreamerSubscriber<T, P> implements
Subscriber<T> {
return completionFut;
}
- private CompletableFuture<Void> sendBatch(P partition, Collection<T>
batch) {
+ private void enlistBatch(P partition, Collection<T> batch) {
int batchSize = batch.size();
assert batchSize > 0 : "Batch size must be positive.";
- CompletableFuture<Void> fut = new CompletableFuture<>();
- pendingFuts.add(fut);
inFlightItemCount.addAndGet(batchSize);
metrics.streamerBatchesActiveAdd(1);
+ pendingRequests.compute(
+ partition,
+ // Chain existing futures to preserve request order.
+ (part, fut) -> fut == null ? sendBatch(part, batch) :
fut.thenCompose(v -> sendBatch(part, batch))
+ );
+ }
+
+ private CompletableFuture<Void> sendBatch(P partition, Collection<T>
batch) {
// If a connection fails, the batch goes to default connection thanks
to built-it retry mechanism.
try {
- batchSender.sendAsync(partition, batch).whenComplete((res, err) ->
{
+ return batchSender.sendAsync(partition, batch).whenComplete((res,
err) -> {
if (err != null) {
// Retry is handled by the sender (RetryPolicy in
ReliableChannel on the client, sendWithRetry on the server).
// If we get here, then retries are exhausted and we
should fail the streamer.
log.error("Failed to send batch to partition " + partition
+ ": " + err.getMessage(), err);
close(err);
} else {
+ int batchSize = batch.size();
+
this.metrics.streamerBatchesSentAdd(1);
this.metrics.streamerBatchesActiveAdd(-1);
this.metrics.streamerItemsSentAdd(batchSize);
this.metrics.streamerItemsQueuedAdd(-batchSize);
- fut.complete(null);
- pendingFuts.remove(fut);
-
inFlightItemCount.addAndGet(-batchSize);
requestMore();
@@ -191,8 +196,6 @@ public class StreamerSubscriber<T, P> implements
Subscriber<T> {
});
}
});
-
- return fut;
} catch (Exception e) {
log.error("Failed to send batch to partition " + partition + ": "
+ e.getMessage(), e);
close(e);
@@ -216,11 +219,9 @@ public class StreamerSubscriber<T, P> implements
Subscriber<T> {
}
if (throwable == null) {
- for (StreamerBuffer<T> buf : buffers.values()) {
- pendingFuts.add(buf.flushAndClose());
- }
+ buffers.values().forEach(StreamerBuffer::flushAndClose);
- var futs = pendingFuts.toArray(new CompletableFuture[0]);
+ var futs = pendingRequests.values().toArray(new
CompletableFuture[0]);
CompletableFuture.allOf(futs).whenComplete((res, err) -> {
if (err != null) {
@@ -262,7 +263,11 @@ public class StreamerSubscriber<T, P> implements
Subscriber<T> {
flushTimer = Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory(threadPrefix, log));
- flushTask = flushTimer.scheduleAtFixedRate(new PeriodicFlushTask(),
interval, interval, TimeUnit.MILLISECONDS);
+ flushTask = flushTimer.scheduleAtFixedRate(this::flushBuffers,
interval, interval, TimeUnit.MILLISECONDS);
+ }
+
+ private void flushBuffers() {
+ buffers.values().forEach(StreamerBuffer::flush);
}
private static StreamerMetricSink getMetrics(@Nullable StreamerMetricSink
metrics) {
@@ -290,16 +295,4 @@ public class StreamerSubscriber<T, P> implements
Subscriber<T> {
}
};
}
-
- /**
- * Periodically flushes buffers.
- */
- private class PeriodicFlushTask implements Runnable {
- @Override
- public void run() {
- for (StreamerBuffer<T> buf : buffers.values()) {
- buf.flush(options.autoFlushFrequency());
- }
- }
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
new file mode 100644
index 0000000000..71a964d740
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/streamer/StreamerSubscriberTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.streamer;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.LongFunction;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+class StreamerSubscriberTest extends BaseIgniteAbstractTest {
+ private static class Metrics implements StreamerMetricSink {
+ private final LongAdder batchesSent = new LongAdder();
+ private final LongAdder itemsSent = new LongAdder();
+ private final LongAdder batchesActive = new LongAdder();
+ private final LongAdder itemsQueued = new LongAdder();
+
+ @Override
+ public void streamerBatchesSentAdd(long batches) {
+ batchesSent.add(batches);
+ }
+
+ @Override
+ public void streamerItemsSentAdd(long items) {
+ itemsSent.add(items);
+ }
+
+ @Override
+ public void streamerBatchesActiveAdd(long batches) {
+ batchesActive.add(batches);
+ }
+
+ @Override
+ public void streamerItemsQueuedAdd(long items) {
+ itemsQueued.add(items);
+ }
+ }
+
+ private static class Options implements StreamerOptions {
+ private final int batchSize;
+ private final int perNodeParallelOperations;
+ private final int autoFlushFrequency;
+
+ Options(int batchSize, int perNodeParallelOperations, int
autoFlushFrequency) {
+ this.batchSize = batchSize;
+ this.perNodeParallelOperations = perNodeParallelOperations;
+ this.autoFlushFrequency = autoFlushFrequency;
+ }
+
+ @Override
+ public int batchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public int perNodeParallelOperations() {
+ return perNodeParallelOperations;
+ }
+
+ @Override
+ public int autoFlushFrequency() {
+ return autoFlushFrequency;
+ }
+ }
+
+ /**
+ * Publisher that generates a {@code limit} number of items and dispatches
them in the same thread.
+ */
+ private static class LimitedPublisher<T> implements Publisher<T> {
+ private final long limit;
+
+ private final LongFunction<T> generator;
+
+ LimitedPublisher(long limit, LongFunction<T> generator) {
+ this.limit = limit;
+ this.generator = generator;
+ }
+
+ @Override
+ public void subscribe(Subscriber<? super T> subscriber) {
+ subscriber.onSubscribe(new Subscription() {
+ private long produced = 0;
+
+ @Override
+ public void request(long n) {
+ if (produced == limit) {
+ produced++;
+ subscriber.onComplete();
+ } else if (produced < limit) {
+ for (int i = 0; i < Math.min(n, limit - produced);
i++) {
+ produced++;
+ subscriber.onNext(generator.apply(produced));
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ });
+ }
+ }
+
+ /**
+ * Tests the backpressure algorithm when batch sending is stuck.
+ */
+ @Test
+ void testBackpressureWithDelay() {
+ long itemsCount = 10;
+
+ var metrics = new Metrics();
+
+ var options = new Options(2, 1, 1000);
+
+ long expectedBatches = itemsCount / options.batchSize;
+
+ var partitionProvider = new StreamerPartitionAwarenessProvider<Long,
String>() {
+ @Override
+ public String partition(Long item) {
+ return "foo";
+ }
+
+ @Override
+ public CompletableFuture<Void> refreshAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+ };
+
+ var sendFuture = new CompletableFuture<Void>();
+
+ var subscriber = new StreamerSubscriber<>(
+ (part, batch) -> sendFuture,
+ partitionProvider,
+ options,
+ log,
+ metrics
+ );
+
+ var publisher = new LimitedPublisher<>(itemsCount, i -> i);
+
+ publisher.subscribe(subscriber);
+
+ assertThat(metrics.batchesActive.longValue(), is(expectedBatches));
+ assertThat(metrics.batchesSent.longValue(), is(0L));
+ assertThat(metrics.itemsQueued.longValue(), is(itemsCount));
+ assertThat(metrics.itemsSent.longValue(), is(0L));
+
+ sendFuture.complete(null);
+
+ assertThat(subscriber.completionFuture(), willCompleteSuccessfully());
+
+ assertThat(metrics.batchesActive.longValue(), is(0L));
+ assertThat(metrics.batchesSent.longValue(), is(expectedBatches));
+ assertThat(metrics.itemsQueued.longValue(), is(0L));
+ assertThat(metrics.itemsSent.longValue(), is(itemsCount));
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
index 62a40f9a87..a82bfa6edd 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/DataStreamer.java
@@ -63,11 +63,6 @@ class DataStreamer {
public int autoFlushFrequency() {
return options0.autoFlushFrequency();
}
-
- @Override
- public int retryLimit() {
- return options0.retryLimit();
- }
};
}
}