[ 
https://issues.apache.org/jira/browse/KAFKA-6900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499565#comment-16499565
 ] 

ASF GitHub Bot commented on KAFKA-6900:
---------------------------------------

RichoDemus closed pull request #5080: KAFKA-6900: Add thenCompose to KafkaFuture
URL: https://github.com/apache/kafka/pull/5080
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java 
b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 9cd2e01dc42..d7ee5453e39 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -115,6 +115,15 @@ private void maybeComplete() {
      */
     public abstract <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function);
 
+    /**
+     * Returns a new KafkaFuture that, when this future completes normally, is 
executed with this
+     * futures's result as the argument to the supplied function.
+     *
+     * The function may be invoked by the thread that calls {@code thenApply} 
or it may be invoked by the thread that
+     * completes the future.
+     */
+    public abstract <R> KafkaFuture<R> thenCompose(BaseFunction<T, 
KafkaFuture<R>> function);
+
     /**
      * @see KafkaFuture#thenApply(BaseFunction)
      *
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index 33916ac952a..f534fe3de97 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.internals;
 
+import org.apache.kafka.common.KafkaFuture;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CancellationException;
@@ -23,8 +25,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.kafka.common.KafkaFuture;
-
 /**
  * A flexible future which supports call chaining and other asynchronous 
programming patterns.
  * This will eventually become a thin shim on top of Java 8's 
CompletableFuture.
@@ -70,6 +70,35 @@ public void accept(A a, Throwable exception) {
         }
     }
 
+    private static class FutureApplicant<A, B> implements BiConsumer<A, 
Throwable> {
+        private final BaseFunction<A, KafkaFuture<B>> function;
+        private final KafkaFutureImpl<B> future;
+
+        FutureApplicant(BaseFunction<A, KafkaFuture<B>> function, 
KafkaFutureImpl<B> future) {
+            this.function = function;
+            this.future = future;
+        }
+
+        @Override
+        public void accept(A a, Throwable exception) {
+            if (exception != null) {
+                future.completeExceptionally(exception);
+            } else {
+                final KafkaFuture<B> b = function.apply(a);
+                b.whenComplete(new BiConsumer<B, Throwable>() {
+                    @Override
+                    public void accept(B result, Throwable error) {
+                        if (error != null) {
+                            future.completeExceptionally(error);
+                        } else {
+                            future.complete(result);
+                        }
+                    }
+                });
+            }
+        }
+    }
+
     private static class SingleWaiter<R> implements BiConsumer<R, Throwable> {
         private R value = null;
         private Throwable exception = null;
@@ -146,6 +175,13 @@ R await(long timeout, TimeUnit unit)
         return future;
     }
 
+    @Override
+    public <R> KafkaFuture<R> thenCompose(BaseFunction<T, KafkaFuture<R>> 
function) {
+        KafkaFutureImpl<R> future = new KafkaFutureImpl<>();
+        addWaiter(new FutureApplicant<>(function, future));
+        return future;
+    }
+
     public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T> 
function) {
         KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future;
         futureImpl.addWaiter(new Applicant<>(function, this));
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java 
b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index 6f9efca7c66..ebc8ab30d61 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -27,10 +27,12 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * A unit test for KafkaFuture.
@@ -124,6 +126,88 @@ public Integer apply(Integer integer) {
         assertTrue(futureAppliedFail.isCompletedExceptionally());
     }
 
+    @Test
+    public void shouldComposeFuture() throws ExecutionException, 
InterruptedException {
+        final KafkaFuture<Integer> one = KafkaFuture.completedFuture(1);
+        final KafkaFuture<Integer> two = one.thenCompose(new 
KafkaFuture.BaseFunction<Integer, KafkaFuture<Integer>>() {
+            @Override
+            public KafkaFuture<Integer> apply(Integer integer) {
+                return KafkaFuture.completedFuture(integer + 1);
+            }
+        });
+        final int result = two.get();
+        assertEquals("The future's value should be two", 2, result);
+    }
+
+    @Test
+    public void shouldComposeMultipleFutures() throws ExecutionException, 
InterruptedException {
+        final KafkaFuture<Integer> root = KafkaFuture.completedFuture(1);
+        final KafkaFuture<Integer> left = root.thenCompose(new 
KafkaFuture.BaseFunction<Integer, KafkaFuture<Integer>>() {
+            @Override
+            public KafkaFuture<Integer> apply(Integer integer) {
+                return KafkaFuture.completedFuture(integer + 1);
+            }
+        });
+        final KafkaFuture<Integer> right = root.thenCompose(new 
KafkaFuture.BaseFunction<Integer, KafkaFuture<Integer>>() {
+            @Override
+            public KafkaFuture<Integer> apply(Integer integer) {
+                return KafkaFuture.completedFuture(integer + 2);
+            }
+        });
+
+        assertEquals(1, (int) root.get());
+        assertEquals(2, (int) left.get());
+        assertEquals(3, (int) right.get());
+    }
+
+    @Test
+    public void shouldNotCallComposedFutureOnFailure() {
+        final AtomicBoolean didRun = new AtomicBoolean(false);
+
+        KafkaFutureImpl<Integer> failingFuture = new KafkaFutureImpl<>();
+        final KafkaFuture<Integer> result = failingFuture.thenCompose(new 
KafkaFuture.BaseFunction<Integer, KafkaFuture<Integer>>() {
+            @Override
+            public KafkaFuture<Integer> apply(Integer integer) {
+                didRun.set(true);
+                return KafkaFuture.completedFuture(1);
+            }
+        });
+
+        failingFuture.completeExceptionally(new Exception());
+        assertTrue("A future composed from a failing future should fail", 
result.isCompletedExceptionally());
+        assertFalse("A future composed from a failing future should not run", 
didRun.get());
+    }
+
+    @Test
+    public void shouldNotCompleteUntilAllComposedFuturesInChainAreComplete() {
+        KafkaFutureImpl<Integer> firstFuture = new KafkaFutureImpl<>();
+        final KafkaFutureImpl<Integer> secondFuture = new KafkaFutureImpl<>();
+
+        KafkaFuture<Integer> result = firstFuture.thenCompose(new 
KafkaFuture.BaseFunction<Integer, KafkaFuture<Integer>>() {
+            @Override
+            public KafkaFuture<Integer> apply(Integer integer) {
+                return secondFuture;
+            }
+        });
+
+        assertFalse(firstFuture.isDone());
+        assertFalse(secondFuture.isDone());
+        assertFalse(result.isDone());
+
+        firstFuture.complete(1);
+
+        assertTrue(firstFuture.isDone());
+        assertFalse(secondFuture.isDone());
+        assertFalse(result.isDone());
+
+        secondFuture.complete(1);
+
+        assertTrue(firstFuture.isDone());
+        assertTrue(secondFuture.isDone());
+        assertTrue(result.isDone());
+    }
+
+
     private static class CompleterThread<T> extends Thread {
 
         private final KafkaFutureImpl<T> future;
@@ -202,8 +286,8 @@ public void testAllOfFutures() throws Exception {
         for (int i = 0; i < numThreads; i++) {
             completerThreads.get(i).join();
             waiterThreads.get(i).join();
-            assertEquals(null, completerThreads.get(i).testException);
-            assertEquals(null, waiterThreads.get(i).testException);
+            assertNull(completerThreads.get(i).testException);
+            assertNull(waiterThreads.get(i).testException);
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add thenCompose to KafkaFuture
> ------------------------------
>
>                 Key: KAFKA-6900
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6900
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 1.1.0
>            Reporter: Richard Tjerngren
>            Priority: Minor
>
> KafkaFuture supports Future chaining via the thenApply method just like 
> CompletableFuture, however, thenApply is not intended to be used for lambdas 
> that in turn return a future:
>  
> {code:java}
> KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
> KafkaFuture<KafkaFuture<String>> nestedFuture = future.thenApply(result -> 
> methodThatReturnsFuture(result));
> {code}
> Completable future has a method called thenCompose 
> [javadoc|https://docs.oracle.com/javase/10/docs/api/java/util/concurrent/CompletionStage.html#thenCompose(java.util.function.Function)]
> The would be:
> {code:java}
> public KafkaFuture<R> thenCompose(Function<T, KafkaFuture<T> func);{code}
> So the above example would look like this:
> {code:java}
> KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
> KafkaFuture<String> nestedFuture = future.thenCompose(result -> 
> methodThatReturnsFuture(result));
> {code}
> This would enable developers to chain asynchronous calls in a more natural 
> way and it also makes KafkaFuture behave more similar to Javas 
> CompletableFuture and Javascripts Promise
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to