This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new d22c52f6dda [FLINK-28027][connectors] Implement slow start for 
AIMDRateLimitingStrategy
d22c52f6dda is described below

commit d22c52f6dda6d5aec37f50b2657293157ca40d96
Author: Hong Teoh <[email protected]>
AuthorDate: Thu Aug 11 05:04:31 2022 +0100

    [FLINK-28027][connectors] Implement slow start for AIMDRateLimitingStrategy
---
 .../flink/connector/base/sink/writer/AsyncSinkWriter.java |  2 +-
 .../connector/base/sink/writer/AsyncSinkWriterTest.java   | 15 +++++++++++++--
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 090504ab8bc..f7a649b1636 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -291,7 +291,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT 
extends Serializable
                         INFLIGHT_MESSAGES_LIMIT_INCREASE_RATE,
                         INFLIGHT_MESSAGES_LIMIT_DECREASE_FACTOR,
                         maxBatchSize * maxInFlightRequests,
-                        maxBatchSize * maxInFlightRequests);
+                        maxBatchSize);
 
         this.metrics = context.metricGroup();
         this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - 
this.lastSendTimestamp);
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 47c437e01e5..14d9924bdd0 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -871,8 +871,19 @@ public class AsyncSinkWriterTest {
                         delayedStartLatch,
                         false);
 
+        // Write a single successful record to trigger increase in 
rateLimitingStrategy rate
+        // threshold
+        writeSingleElementUsingProcessingTimeTrigger(sink);
         writeTwoElementsAndInterleaveTheNextTwoElements(sink, 
blockedWriteLatch, delayedStartLatch);
-        assertThat(res).isEqualTo(Arrays.asList(4, 1, 2, 3));
+        assertThat(res).isEqualTo(Arrays.asList(0, 4, 1, 2, 3));
+    }
+
+    private void 
writeSingleElementUsingProcessingTimeTrigger(AsyncSinkWriterImpl sink)
+            throws Exception {
+        TestProcessingTimeService tpts = 
sinkInitContext.getTestProcessingTimeService();
+        tpts.setCurrentTime(0L);
+        sink.write("0");
+        tpts.setCurrentTime(100L);
     }
 
     private void writeTwoElementsAndInterleaveTheNextTwoElements(
@@ -898,7 +909,7 @@ public class AsyncSinkWriterTest {
 
         delayedStartLatch.await();
         sink.write("4");
-        tpts.setCurrentTime(100L);
+        tpts.setCurrentTime(200L);
         blockedWriteLatch.countDown();
         es.shutdown();
         assertThat(es.awaitTermination(500, TimeUnit.MILLISECONDS))

Reply via email to