dannycranmer commented on a change in pull request #18880:
URL: https://github.com/apache/flink/pull/18880#discussion_r812065879



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -229,6 +233,53 @@ public AsyncSinkWriter(
             long maxBatchSizeInBytes,
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
+            List<BufferedRequestState<RequestEntryT>> states) {
+        this(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes,
+                new FixedRateLimitingStrategy(maxInFlightRequests * 
maxBatchSize),
+                states);
+    }
+
+    public AsyncSinkWriter(

Review comment:
       This constructor is not used, please delete

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -217,6 +220,7 @@ public AsyncSinkWriter(
                 maxBatchSizeInBytes,
                 maxTimeInBufferMS,
                 maxRecordSizeInBytes,
+                new FixedRateLimitingStrategy(maxInFlightRequests * 
maxBatchSize),

Review comment:
       Looks like this constructor is only used for test, we should not add 
source code for test convenience. Can you please remove this constructor and 
call the main one from the test. Test code can create 
`FixedRateLimitingStrategy`

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
##########
@@ -63,7 +64,8 @@ public void setup() {
                         1000 * 1024,
                         true,
                         "streamName",
-                        sinkProperties);
+                        sinkProperties,
+                        new FixedRateLimitingStrategy(16 * 50));

Review comment:
       Why no equivalent for KDS? Would assume similar test?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -339,7 +400,10 @@ private void flush() {
      * {@code maxBatchSizeInBytes}. Also adds these to the metrics counters.
      */
     private List<RequestEntryT> createNextAvailableBatch() {
-        int batchSize = Math.min(maxBatchSize, bufferedRequestEntries.size());
+        int batchSize =
+                Math.min(
+                        Math.min(maxBatchSize, bufferedRequestEntries.size()),
+                        rateLimitingStrategy.getInFlightMessagesLimit());

Review comment:
       `Math.min(maxBatchSize, bufferedRequestEntries.size(), 
rateLimitingStrategy.getInFlightMessagesLimit())`

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/FixedRateLimitingStrategy.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Constant Rate implementation of {@link RateLimitingStrategy}. */
+@PublicEvolving
+public final class FixedRateLimitingStrategy implements RateLimitingStrategy {

Review comment:
       Let's move this to the test package. It can be promoted to source later 
if needed

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -116,7 +117,9 @@
      * <p>To complete a checkpoint, we need to make sure that no requests are 
in flight, as they may
      * fail, which could then lead to data loss.
      */
-    private int inFlightRequestsCount;
+    private Integer inFlightRequestsCount;
+
+    private int inFlightMessages;

Review comment:
       Can you add javadocs for this field. The other fields in the class are 
well documented

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -366,10 +430,19 @@ private void flush() {
      *
      * @param failedRequestEntries requestEntries that need to be retried
      */
-    private void completeRequest(List<RequestEntryT> failedRequestEntries, 
long requestStartTime) {
+    private void completeRequest(
+            List<RequestEntryT> failedRequestEntries, int messages, long 
requestStartTime) {
         lastSendTimestamp = requestStartTime;
         ackTime = System.currentTimeMillis();
         inFlightRequestsCount--;
+        inFlightMessages -= messages;

Review comment:
       It appears as though `messages` is the total batch size. We re-enqueue 
`failedRequestEntries` but do not do `inFlightMessages += 
failedRequestEntries.size()`
   
   Please verify 

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RateLimitingStrategy.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * Strategy limiting inflight flow rate of written <b>messages</b> downstream 
by {@link
+ * AsyncSinkWriter}, Strategy should increase limit with each 
acknowledged/successful request and
+ * decrease limit with each throttled request. It is the writer responsibility 
to classify failed
+ * requests into throttled requests or any other failure.
+ */
+@PublicEvolving
+public interface RateLimitingStrategy extends Serializable {
+    int getInFlightMessagesLimit();

Review comment:
       This name is too specific. I would suggest `getLimit()` or 
`getRateLimit()`

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AIMDRateLimitingStrategy.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Additive Increase/Multiplicative Decrease implementation of {@link 
RateLimitingStrategy}. */
+@PublicEvolving
+public final class AIMDRateLimitingStrategy implements RateLimitingStrategy {
+    private final int increaseRate;
+    private final double decreaseFactor;
+    private final int rateThreshold;
+
+    private final AtomicReference<Integer> inFlightMessages;
+
+    public AIMDRateLimitingStrategy(
+            int increaseRate, double decreaseFactor, int rateThreshold, int 
initialRate) {
+        Preconditions.checkArgument(
+                decreaseFactor < 1.0 && decreaseFactor > 0.0,
+                "Decrease factor must be between 0.0 and 1.0.");
+        Preconditions.checkArgument(increaseRate > 0, "Increase rate must be 
positive integer.");
+        Preconditions.checkArgument(
+                rateThreshold >= initialRate, "Initial rate must not exceed 
threshold.");
+
+        this.increaseRate = increaseRate;
+        this.decreaseFactor = decreaseFactor;
+        this.rateThreshold = rateThreshold;
+        this.inFlightMessages = new AtomicReference<>(initialRate);

Review comment:
       Why do we need atomic reference here?

##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AIMDRateLimitingStrategyTest.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+/** Unit tests for {@link AIMDRateLimitingStrategy}. */
+public class AIMDRateLimitingStrategyTest {
+    private static final int INITIAL_RATE = 32;
+    private static final int INCREASE_RATE = 10;
+    private static final int THRESHOLD = 100;
+    private static final double DECREASE_RATE = 0.5;
+
+    @Test
+    public void testInitialRateIsSetByConstructor() {
+        AIMDRateLimitingStrategy rateLimitingStrategy =
+                new AIMDRateLimitingStrategy(INCREASE_RATE, DECREASE_RATE, 
THRESHOLD, INITIAL_RATE);
+
+        Assertions.assertThat(rateLimitingStrategy.getInFlightMessagesLimit())

Review comment:
       nit: These are more readable with a static import for `assertThat`

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -255,8 +306,9 @@ public AsyncSinkWriter(
         this.maxBatchSizeInBytes = maxBatchSizeInBytes;
         this.maxTimeInBufferMS = maxTimeInBufferMS;
         this.maxRecordSizeInBytes = maxRecordSizeInBytes;
-
+        this.rateLimitingStrategy = rateLimitingStrategy;

Review comment:
       Add `Preconditions.checkNotNull(rateLimitingStrategy);`

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -229,6 +233,53 @@ public AsyncSinkWriter(
             long maxBatchSizeInBytes,
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
+            List<BufferedRequestState<RequestEntryT>> states) {

Review comment:
       This constructor is not used, please delete

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RateLimitingStrategy.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * Strategy limiting inflight flow rate of written <b>messages</b> downstream 
by {@link
+ * AsyncSinkWriter}, Strategy should increase limit with each 
acknowledged/successful request and
+ * decrease limit with each throttled request. It is the writer responsibility 
to classify failed
+ * requests into throttled requests or any other failure.
+ */
+@PublicEvolving
+public interface RateLimitingStrategy extends Serializable {
+    int getInFlightMessagesLimit();
+
+    void updateWithAcknowledgedRequests();
+
+    void updateWithFailedRequests();

Review comment:
       These again do not sit well with the more generic 
`RateLimitingStrategy`. I would prefer some more generic verb like `scaleUp()` 
and `scaleDown()` or `speedUp()` and `speedDown()`.
   
   Did you look to see if there are any other open source Rate Limiting 
interfaces you can get inspiration from?  

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -116,7 +117,9 @@
      * <p>To complete a checkpoint, we need to make sure that no requests are 
in flight, as they may
      * fail, which could then lead to data loss.
      */
-    private int inFlightRequestsCount;
+    private Integer inFlightRequestsCount;

Review comment:
       Why does this need to be nullable? Surely we can safely initialise to 0?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -229,6 +233,53 @@ public AsyncSinkWriter(
             long maxBatchSizeInBytes,
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
+            List<BufferedRequestState<RequestEntryT>> states) {
+        this(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes,
+                new FixedRateLimitingStrategy(maxInFlightRequests * 
maxBatchSize),
+                states);
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            RateLimitingStrategy rateLimitingStrategy) {
+        this(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes,
+                rateLimitingStrategy,
+                Collections.emptyList());
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            RateLimitingStrategy rateLimitingStrategy,

Review comment:
       Do we really need 4 public constructors? Looks like this is the only one 
that is actually used.
   
   The parameter count is looking high now. I said before we should wrap the 
configs into a config object, consider adding builder here too. Please create 
follow up, do not need to bloat this PR

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AIMDRateLimitingStrategy.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Additive Increase/Multiplicative Decrease implementation of {@link 
RateLimitingStrategy}. */
+@PublicEvolving
+public final class AIMDRateLimitingStrategy implements RateLimitingStrategy {
+    private final int increaseRate;
+    private final double decreaseFactor;
+    private final int rateThreshold;
+
+    private final AtomicReference<Integer> inFlightMessages;
+
+    public AIMDRateLimitingStrategy(

Review comment:
       Can you briefly document somewhere (class or constructor comment) the 
purpose of the parameters `increaseRate, decreaseFactor, rateThreshold, 
initialRate`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to