echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915889897


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link 
OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link 
OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                
completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   > > I know but using whenComplete won't work as get() is never called on the 
result future
   > 
   > Sure it does. `whenComplete` is run once the future is completed (by the 
thread completing the future (unless its already completed at the time that 
`whenComplete` is called)), not when the result is requested.
   
   You're right I was too quick with my answer, I confused `whenComplete` with 
`thenAccept`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to