echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915754903
##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
}
}
+ /**
+ * Asynchronously write a record and deal with {@link
OutputFormatBase#maxConcurrentRequests}.
+ * To specify how a record is written, please override the {@link
OutputFormatBase#send(Object)}
+ * method.
+ */
@Override
- public void writeRecord(OUT record) throws IOException {
+ public final void writeRecord(OUT record) throws IOException {
checkAsyncErrors();
tryAcquire(1);
- final ListenableFuture<V> result;
+ final CompletionStage<V> result;
try {
result = send(record);
} catch (Throwable e) {
semaphore.release();
throw e;
}
- Futures.addCallback(result, callback);
+ Futures.addCallback(
+
completableFutureToListenableFuture(result.toCompletableFuture()),
+ callback,
+ Executors.directExecutor());
Review Comment:
> This is overly complicated.
>
> When you have a `CompletionStage` you can just apply a `whenComplete`.
with the callback being re-written to a `BiConsumer<V, Throwable>`.
I know but using `whenComplete` won't work as `get()` is never called on the
result future. And we don't call `get()` on this future because otherwise the
`writeRecord` method would become synchronous. This is why I set a listener so
that the `callback` is called and the method still stays asynchronous.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]