[ https://issues.apache.org/jira/browse/KAFKA-6900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499565#comment-16499565 ]
ASF GitHub Bot commented on KAFKA-6900: --------------------------------------- RichoDemus closed pull request #5080: KAFKA-6900: Add thenCompose to KafkaFuture URL: https://github.com/apache/kafka/pull/5080 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java index 9cd2e01dc42..d7ee5453e39 100644 --- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java +++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java @@ -115,6 +115,15 @@ private void maybeComplete() { */ public abstract <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function); + /** + * Returns a new KafkaFuture that, when this future completes normally, is executed with this + * futures's result as the argument to the supplied function. + * + * The function may be invoked by the thread that calls {@code thenApply} or it may be invoked by the thread that + * completes the future. + */ + public abstract <R> KafkaFuture<R> thenCompose(BaseFunction<T, KafkaFuture<R>> function); + /** * @see KafkaFuture#thenApply(BaseFunction) * diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java index 33916ac952a..f534fe3de97 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.internals; +import org.apache.kafka.common.KafkaFuture; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.CancellationException; @@ -23,8 +25,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.kafka.common.KafkaFuture; - /** * A flexible future which supports call chaining and other asynchronous programming patterns. * This will eventually become a thin shim on top of Java 8's CompletableFuture. @@ -70,6 +70,35 @@ public void accept(A a, Throwable exception) { } } + private static class FutureApplicant<A, B> implements BiConsumer<A, Throwable> { + private final BaseFunction<A, KafkaFuture<B>> function; + private final KafkaFutureImpl<B> future; + + FutureApplicant(BaseFunction<A, KafkaFuture<B>> function, KafkaFutureImpl<B> future) { + this.function = function; + this.future = future; + } + + @Override + public void accept(A a, Throwable exception) { + if (exception != null) { + future.completeExceptionally(exception); + } else { + final KafkaFuture<B> b = function.apply(a); + b.whenComplete(new BiConsumer<B, Throwable>() { + @Override + public void accept(B result, Throwable error) { + if (error != null) { + future.completeExceptionally(error); + } else { + future.complete(result); + } + } + }); + } + } + } + private static class SingleWaiter<R> implements BiConsumer<R, Throwable> { private R value = null; private Throwable exception = null; @@ -146,6 +175,13 @@ R await(long timeout, TimeUnit unit) return future; } + @Override + public <R> KafkaFuture<R> thenCompose(BaseFunction<T, KafkaFuture<R>> function) { + KafkaFutureImpl<R> future = new KafkaFutureImpl<>(); + addWaiter(new FutureApplicant<>(function, future)); + return future; + } + public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T> function) { KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future; futureImpl.addWaiter(new Applicant<>(function, this)); diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java index 6f9efca7c66..ebc8ab30d61 100644 --- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java @@ -27,10 +27,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** * A unit test for KafkaFuture. @@ -124,6 +126,88 @@ public Integer apply(Integer integer) { assertTrue(futureAppliedFail.isCompletedExceptionally()); } + @Test + public void shouldComposeFuture() throws ExecutionException, InterruptedException { + final KafkaFuture<Integer> one = KafkaFuture.completedFuture(1); + final KafkaFuture<Integer> two = one.thenCompose(new KafkaFuture.BaseFunction<Integer, KafkaFuture<Integer>>() { + @Override + public KafkaFuture<Integer> apply(Integer integer) { + return KafkaFuture.completedFuture(integer + 1); + } + }); + final int result = two.get(); + assertEquals("The future's value should be two", 2, result); + } + + @Test + public void shouldComposeMultipleFutures() throws ExecutionException, InterruptedException { + final KafkaFuture<Integer> root = KafkaFuture.completedFuture(1); + final KafkaFuture<Integer> left = root.thenCompose(new KafkaFuture.BaseFunction<Integer, KafkaFuture<Integer>>() { + @Override + public KafkaFuture<Integer> apply(Integer integer) { + return KafkaFuture.completedFuture(integer + 1); + } + }); + final KafkaFuture<Integer> right = root.thenCompose(new KafkaFuture.BaseFunction<Integer, KafkaFuture<Integer>>() { + @Override + public KafkaFuture<Integer> apply(Integer integer) { + return KafkaFuture.completedFuture(integer + 2); + } + }); + + assertEquals(1, (int) root.get()); + assertEquals(2, (int) left.get()); + assertEquals(3, (int) right.get()); + } + + @Test + public void shouldNotCallComposedFutureOnFailure() { + final AtomicBoolean didRun = new AtomicBoolean(false); + + KafkaFutureImpl<Integer> failingFuture = new KafkaFutureImpl<>(); + final KafkaFuture<Integer> result = failingFuture.thenCompose(new KafkaFuture.BaseFunction<Integer, KafkaFuture<Integer>>() { + @Override + public KafkaFuture<Integer> apply(Integer integer) { + didRun.set(true); + return KafkaFuture.completedFuture(1); + } + }); + + failingFuture.completeExceptionally(new Exception()); + assertTrue("A future composed from a failing future should fail", result.isCompletedExceptionally()); + assertFalse("A future composed from a failing future should not run", didRun.get()); + } + + @Test + public void shouldNotCompleteUntilAllComposedFuturesInChainAreComplete() { + KafkaFutureImpl<Integer> firstFuture = new KafkaFutureImpl<>(); + final KafkaFutureImpl<Integer> secondFuture = new KafkaFutureImpl<>(); + + KafkaFuture<Integer> result = firstFuture.thenCompose(new KafkaFuture.BaseFunction<Integer, KafkaFuture<Integer>>() { + @Override + public KafkaFuture<Integer> apply(Integer integer) { + return secondFuture; + } + }); + + assertFalse(firstFuture.isDone()); + assertFalse(secondFuture.isDone()); + assertFalse(result.isDone()); + + firstFuture.complete(1); + + assertTrue(firstFuture.isDone()); + assertFalse(secondFuture.isDone()); + assertFalse(result.isDone()); + + secondFuture.complete(1); + + assertTrue(firstFuture.isDone()); + assertTrue(secondFuture.isDone()); + assertTrue(result.isDone()); + } + + private static class CompleterThread<T> extends Thread { private final KafkaFutureImpl<T> future; @@ -202,8 +286,8 @@ public void testAllOfFutures() throws Exception { for (int i = 0; i < numThreads; i++) { completerThreads.get(i).join(); waiterThreads.get(i).join(); - assertEquals(null, completerThreads.get(i).testException); - assertEquals(null, waiterThreads.get(i).testException); + assertNull(completerThreads.get(i).testException); + assertNull(waiterThreads.get(i).testException); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add thenCompose to KafkaFuture > ------------------------------ > > Key: KAFKA-6900 > URL: https://issues.apache.org/jira/browse/KAFKA-6900 > Project: Kafka > Issue Type: Improvement > Components: clients > Affects Versions: 1.1.0 > Reporter: Richard Tjerngren > Priority: Minor > > KafkaFuture supports Future chaining via the thenApply method just like > CompletableFuture, however, thenApply is not intended to be used for lambdas > that in turn return a future: > > {code:java} > KafkaFutureImpl<String> future = new KafkaFutureImpl<>(); > KafkaFuture<KafkaFuture<String>> nestedFuture = future.thenApply(result -> > methodThatReturnsFuture(result)); > {code} > Completable future has a method called thenCompose > [javadoc|https://docs.oracle.com/javase/10/docs/api/java/util/concurrent/CompletionStage.html#thenCompose(java.util.function.Function)] > The would be: > {code:java} > public KafkaFuture<R> thenCompose(Function<T, KafkaFuture<T> func);{code} > So the above example would look like this: > {code:java} > KafkaFutureImpl<String> future = new KafkaFutureImpl<>(); > KafkaFuture<String> nestedFuture = future.thenCompose(result -> > methodThatReturnsFuture(result)); > {code} > This would enable developers to chain asynchronous calls in a more natural > way and it also makes KafkaFuture behave more similar to Javas > CompletableFuture and Javascripts Promise > -- This message was sent by Atlassian JIRA (v7.6.3#76005)