pnowojski commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813954056
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -290,17 +290,33 @@ private void registerCallback() {
@Override
public void write(InputT element, Context context) throws IOException,
InterruptedException {
while (bufferedRequestEntries.size() >= maxBufferedRequests) {
- mailboxExecutor.tryYield();
+ flush();
}
addEntryToBuffer(elementConverter.apply(element, context), false);
- flushIfAble();
+ nonBlockingFlush();
}
- private void flushIfAble() {
- while (bufferedRequestEntries.size() >= maxBatchSize
- || bufferedRequestEntriesTotalSizeInBytes >=
maxBatchSizeInBytes) {
+ /**
+ * Determines if a call to flush will be non-blocking (i.e. {@code
inFlightRequestsCount} is
+ * strictly smaller than {@code maxInFlightRequests}). Also requires one
of the following
+ * requirements to be met:
+ *
+ * <ul>
+ * <li>The number of elements buffered is greater than or equal to the
{@code maxBatchSize}
+ * <li>The sum of the size in bytes of all records in the buffer is
greater than or equal to
+ * {@code maxBatchSizeInBytes}
+ * </ul>
+ */
+ private void nonBlockingFlush() {
+ boolean uncompletedInFlightResponses = true;
+ while (uncompletedInFlightResponses) {
+ uncompletedInFlightResponses = mailboxExecutor.tryYield();
+ }
Review comment:
I think the code could be without the `tryYield()`, something more like
this:
```
private void completeRequest(List<RequestEntryT> failedRequestEntries,
long requestStartTime) {
lastSendTimestamp = requestStartTime;
ackTime = System.currentTimeMillis();
inFlightRequestsCount--;
ListIterator<RequestEntryT> iterator =
failedRequestEntries.listIterator(failedRequestEntries.size());
while (iterator.hasPrevious()) {
addEntryToBuffer(iterator.previous(), true);
}
nonBlockingFlush();
}
```
When executing `completeRequest()` we are already in the mail. If there are
more mails coming after this `completeRequest()`, they will be executed
independently of `tryYield()` call anyway. There is no need to chain those
mails together.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]