hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r918860086
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -391,22 +390,24 @@ private void flush() throws InterruptedException {
timestampOfRequest),
"Mark in-flight request as completed and
requeue %d request entries",
failedRequestEntries.size());
-
+ rateLimitingStrategy.registerInFlightRequest(requestInfo);
inFlightRequestsCount++;
- inFlightMessages += batchSize;
- submitRequestEntries(batch, requestResult);
+ submitRequestEntries(batch, requestResultCallback);
+ }
+
+ private int getNextBatchSize() {
+ return Math.min(rateLimitingStrategy.getMaxBatchSize(),
bufferedRequestEntries.size());
}
/**
* Creates the next batch of request entries while respecting the {@code
maxBatchSize} and
* {@code maxBatchSizeInBytes}. Also adds these to the metrics counters.
*/
- private List<RequestEntryT> createNextAvailableBatch() {
- int batchSize = Math.min(getNextBatchSizeLimit(),
bufferedRequestEntries.size());
- List<RequestEntryT> batch = new ArrayList<>(batchSize);
+ private List<RequestEntryT> createNextAvailableBatch(RequestInfo
requestInfo) {
+ List<RequestEntryT> batch = new ArrayList<>(requestInfo.batchSize);
int batchSizeBytes = 0;
Review Comment:
Sure sounds good
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]