This is an automated email from the ASF dual-hosted git repository.

tombentley pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 19e8fd5  KAFKA-9687: KIP-707: Add KafkaFuture.toCompletionStage() 
(#9878)
19e8fd5 is described below

commit 19e8fd513a15b595f940370be07215863ad1ff7b
Author: Tom Bentley <[email protected]>
AuthorDate: Mon Jul 5 17:33:33 2021 +0100

    KAFKA-9687: KIP-707: Add KafkaFuture.toCompletionStage() (#9878)
    
    * Improve the test prior to reimplementing KafkaFutureImpl using 
CompletableFuture.
    * KAFKA-9687: Reimplement KafkaFutureImpl using a CompleteableFuture
    * KIP-707: Add KafkaFuture.toCompletionStage
    
    Reviewers: Chia-Ping Tsai <[email protected]>, David Jacot 
<[email protected]>, Konstantine Karantasis <[email protected]>
---
 .../java/org/apache/kafka/common/KafkaFuture.java  |  98 ++--
 .../common/internals/KafkaCompletableFuture.java   |  95 ++++
 .../kafka/common/internals/KafkaFutureImpl.java    | 329 ++++++--------
 .../org/apache/kafka/common/KafkaFutureTest.java   | 503 +++++++++++++++++++--
 gradle/spotbugs-exclude.xml                        |   6 +
 5 files changed, 766 insertions(+), 265 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java 
b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 9cd2e01..84aed74 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -16,25 +16,33 @@
  */
 package org.apache.kafka.common;
 
-import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
- * A flexible future which supports call chaining and other asynchronous 
programming patterns. This will
- * eventually become a thin shim on top of Java 8's CompletableFuture.
+ * A flexible future which supports call chaining and other asynchronous 
programming patterns.
  *
- * The API for this class is still evolving and we may break compatibility in 
minor releases, if necessary.
+ * <h3>Relation to {@code CompletionStage}</h3>
+ * <p>It is possible to obtain a {@code CompletionStage} from a
+ * {@code KafkaFuture} instance by calling {@link #toCompletionStage()}.
+ * If converting {@link KafkaFuture#whenComplete(BiConsumer)} or {@link 
KafkaFuture#thenApply(BaseFunction)} to
+ * {@link CompletableFuture#whenComplete(java.util.function.BiConsumer)} or
+ * {@link CompletableFuture#thenApply(java.util.function.Function)} be aware 
that the returned
+ * {@code KafkaFuture} will fail with an {@code ExecutionException}, whereas a 
{@code CompletionStage} fails
+ * with a {@code CompletionException}.
  */
[email protected]
 public abstract class KafkaFuture<T> implements Future<T> {
     /**
      * A function which takes objects of type A and returns objects of type B.
      */
+    @FunctionalInterface
     public interface BaseFunction<A, B> {
         B apply(A a);
     }
@@ -42,52 +50,24 @@ public abstract class KafkaFuture<T> implements Future<T> {
     /**
      * A function which takes objects of type A and returns objects of type B.
      *
-     * Prefer the functional interface {@link BaseFunction} over the class 
{@link Function}.  This class is here for
-     * backwards compatibility reasons and might be deprecated/removed in a 
future release.
+     * @deprecated Since Kafka 3.0. Use the {@link BaseFunction} functional 
interface.
      */
+    @Deprecated
     public static abstract class Function<A, B> implements BaseFunction<A, B> 
{ }
 
     /**
      * A consumer of two different types of object.
      */
+    @FunctionalInterface
     public interface BiConsumer<A, B> {
         void accept(A a, B b);
     }
 
-    private static class AllOfAdapter<R> implements BiConsumer<R, Throwable> {
-        private int remainingResponses;
-        private KafkaFuture<?> future;
-
-        public AllOfAdapter(int remainingResponses, KafkaFuture<?> future) {
-            this.remainingResponses = remainingResponses;
-            this.future = future;
-            maybeComplete();
-        }
-
-        @Override
-        public synchronized void accept(R newValue, Throwable exception) {
-            if (remainingResponses <= 0)
-                return;
-            if (exception != null) {
-                remainingResponses = 0;
-                future.completeExceptionally(exception);
-            } else {
-                remainingResponses--;
-                maybeComplete();
-            }
-        }
-
-        private void maybeComplete() {
-            if (remainingResponses <= 0)
-                future.complete(null);
-        }
-    }
-
     /** 
      * Returns a new KafkaFuture that is already completed with the given 
value.
      */
     public static <U> KafkaFuture<U> completedFuture(U value) {
-        KafkaFuture<U> future = new KafkaFutureImpl<U>();
+        KafkaFuture<U> future = new KafkaFutureImpl<>();
         future.complete(value);
         return future;
     }
@@ -98,15 +78,46 @@ public abstract class KafkaFuture<T> implements Future<T> {
      * an exception, which one gets returned is arbitrarily chosen.
      */
     public static KafkaFuture<Void> allOf(KafkaFuture<?>... futures) {
-        KafkaFuture<Void> allOfFuture = new KafkaFutureImpl<>();
-        AllOfAdapter<Object> allOfWaiter = new AllOfAdapter<>(futures.length, 
allOfFuture);
-        for (KafkaFuture<?> future : futures) {
-            future.addWaiter(allOfWaiter);
-        }
-        return allOfFuture;
+        KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+        CompletableFuture.allOf(Arrays.stream(futures)
+                .map(kafkaFuture -> {
+                    // Safe since KafkaFuture's only subclass is KafkaFuture 
for which toCompletionStage()
+                    // always return a CF.
+                    return (CompletableFuture<?>) 
kafkaFuture.toCompletionStage();
+                })
+                .toArray(CompletableFuture[]::new)).whenComplete((value, ex) 
-> {
+                    if (ex == null) {
+                        result.complete(value);
+                    } else {
+                        // Have to unwrap the CompletionException which 
allOf() introduced
+                        result.completeExceptionally(ex.getCause());
+                    }
+                });
+
+        return result;
     }
 
     /**
+     * Gets a {@code CompletionStage} with the same completion properties as 
this {@code KafkaFuture}.
+     * The returned instance will complete when this future completes and in 
the same way
+     * (with the same result or exception).
+     *
+     * <p>Calling {@code toCompletableFuture()} on the returned instance will 
yield a {@code CompletableFuture},
+     * but invocation of the completion methods ({@code complete()} and other 
methods in the {@code complete*()}
+     * and {@code obtrude*()} families) on that {@code CompletableFuture} 
instance will result in
+     * {@code UnsupportedOperationException} being thrown. Unlike a "minimal" 
{@code CompletableFuture},
+     * the {@code get*()} and other methods of {@code CompletableFuture} that 
are not inherited from
+     * {@code CompletionStage} will work normally.
+     *
+     * <p>If you want to block on the completion of a KafkaFuture you should 
use
+     * {@link #get()}, {@link #get(long, TimeUnit)} or {@link 
#getNow(Object)}, rather then calling
+     * {@code .toCompletionStage().toCompletableFuture().get()} etc.
+     *
+     * @since Kafka 3.0
+     */
+    public abstract CompletionStage<T> toCompletionStage();
+
+    /**
      * Returns a new KafkaFuture that, when this future completes normally, is 
executed with this
      * futures's result as the argument to the supplied function.
      *
@@ -145,7 +156,6 @@ public abstract class KafkaFuture<T> implements Future<T> {
      */
     public abstract KafkaFuture<T> whenComplete(BiConsumer<? super T, ? super 
Throwable> action);
 
-    protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable> 
action);
     /**
      * If not already completed, sets the value returned by get() and related 
methods to the given
      * value.
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaCompletableFuture.java
 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaCompletableFuture.java
new file mode 100644
index 0000000..8c75ea4
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaCompletableFuture.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This internal class exists because CompletableFuture exposes complete(), 
completeExceptionally() and
+ * other methods which would allow erroneous completion by user code of a 
KafkaFuture returned from a
+ * Kafka API to a client application.
+ * @param <T> The type of the future value.
+ */
+public class KafkaCompletableFuture<T> extends CompletableFuture<T> {
+
+    /**
+     * Completes this future normally. For internal use by the Kafka clients, 
not by user code.
+     * @param value the result value
+     * @return {@code true} if this invocation caused this CompletableFuture
+     * to transition to a completed state, else {@code false}
+     */
+    boolean kafkaComplete(T value) {
+        return super.complete(value);
+    }
+
+    /**
+     * Completes this future exceptionally. For internal use by the Kafka 
clients, not by user code.
+     * @param throwable the exception.
+     * @return {@code true} if this invocation caused this CompletableFuture
+     * to transition to a completed state, else {@code false}
+     */
+    boolean kafkaCompleteExceptionally(Throwable throwable) {
+        return super.completeExceptionally(throwable);
+    }
+
+    @Override
+    public boolean complete(T value) {
+        throw erroneousCompletionException();
+    }
+
+    @Override
+    public boolean completeExceptionally(Throwable ex) {
+        throw erroneousCompletionException();
+    }
+
+    @Override
+    public void obtrudeValue(T value) {
+        throw erroneousCompletionException();
+    }
+
+    @Override
+    public void obtrudeException(Throwable ex) {
+        throw erroneousCompletionException();
+    }
+
+    //@Override // enable once Kafka no longer supports Java 8
+    public <U> CompletableFuture<U> newIncompleteFuture() {
+        return new KafkaCompletableFuture<>();
+    }
+
+    //@Override // enable once Kafka no longer supports Java 8
+    public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, 
Executor executor) {
+        throw erroneousCompletionException();
+    }
+
+    //@Override // enable once Kafka no longer supports Java 8
+    public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
+        throw erroneousCompletionException();
+    }
+
+    //@Override // enable once Kafka no longer supports Java 8
+    public CompletableFuture<T> completeOnTimeout(T value, long timeout, 
TimeUnit unit) {
+        throw erroneousCompletionException();
+    }
+
+    private UnsupportedOperationException erroneousCompletionException() {
+        return new UnsupportedOperationException("User code should not 
complete futures returned from Kafka clients");
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index c0babe1..711bd25 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -16,9 +16,10 @@
  */
 package org.apache.kafka.common.internals;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -27,113 +28,26 @@ import org.apache.kafka.common.KafkaFuture;
 
 /**
  * A flexible future which supports call chaining and other asynchronous 
programming patterns.
- * This will eventually become a thin shim on top of Java 8's 
CompletableFuture.
  */
 public class KafkaFutureImpl<T> extends KafkaFuture<T> {
-    /**
-     * A convenience method that throws the current exception, wrapping it if 
needed.
-     *
-     * In general, KafkaFuture throws CancellationException and 
InterruptedException directly, and
-     * wraps all other exceptions in an ExecutionException.
-     */
-    private static void wrapAndThrow(Throwable t) throws InterruptedException, 
ExecutionException {
-        if (t instanceof CancellationException) {
-            throw (CancellationException) t;
-        } else if (t instanceof InterruptedException) {
-            throw (InterruptedException) t;
-        } else {
-            throw new ExecutionException(t);
-        }
-    }
 
-    private static class Applicant<A, B> implements BiConsumer<A, Throwable> {
-        private final BaseFunction<A, B> function;
-        private final KafkaFutureImpl<B> future;
+    private final KafkaCompletableFuture<T> completableFuture;
 
-        Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) {
-            this.function = function;
-            this.future = future;
-        }
+    private final boolean isDependant;
 
-        @Override
-        public void accept(A a, Throwable exception) {
-            if (exception != null) {
-                future.completeExceptionally(exception);
-            } else {
-                try {
-                    B b = function.apply(a);
-                    future.complete(b);
-                } catch (Throwable t) {
-                    future.completeExceptionally(t);
-                }
-            }
-        }
+    public KafkaFutureImpl() {
+        this(false, new KafkaCompletableFuture<>());
     }
 
-    private static class SingleWaiter<R> implements BiConsumer<R, Throwable> {
-        private R value = null;
-        private Throwable exception = null;
-        private boolean done = false;
-
-        @Override
-        public synchronized void accept(R newValue, Throwable newException) {
-            this.value = newValue;
-            this.exception = newException;
-            this.done = true;
-            this.notifyAll();
-        }
-
-        synchronized R await() throws InterruptedException, ExecutionException 
{
-            while (true) {
-                if (exception != null)
-                    wrapAndThrow(exception);
-                if (done)
-                    return value;
-                this.wait();
-            }
-        }
-
-        R await(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException, 
TimeoutException {
-            long startMs = System.currentTimeMillis();
-            long waitTimeMs = unit.toMillis(timeout);
-            long delta = 0;
-            synchronized (this) {
-                while (true) {
-                    if (exception != null)
-                        wrapAndThrow(exception);
-                    if (done)
-                        return value;
-                    if (delta >= waitTimeMs) {
-                        throw new TimeoutException();
-                    }
-                    this.wait(waitTimeMs - delta);
-                    delta = System.currentTimeMillis() - startMs;
-                }
-            }
-        }
+    private KafkaFutureImpl(boolean isDependant, KafkaCompletableFuture<T> 
completableFuture) {
+        this.isDependant = isDependant;
+        this.completableFuture = completableFuture;
     }
 
-    /**
-     * True if this future is done.
-     */
-    private boolean done = false;
-
-    /**
-     * The value of this future, or null.  Protected by the object monitor.
-     */
-    private T value = null;
-
-    /**
-     * The exception associated with this future, or null.  Protected by the 
object monitor.
-     */
-    private Throwable exception = null;
-
-    /**
-     * A list of objects waiting for this future to complete (either 
successfully or
-     * exceptionally).  Protected by the object monitor.
-     */
-    private List<BiConsumer<? super T, ? super Throwable>> waiters = new 
ArrayList<>();
+    @Override
+    public CompletionStage<T> toCompletionStage() {
+        return completableFuture;
+    }
 
     /**
      * Returns a new KafkaFuture that, when this future completes normally, is 
executed with this
@@ -141,103 +55,80 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
      */
     @Override
     public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) {
-        KafkaFutureImpl<R> future = new KafkaFutureImpl<>();
-        addWaiter(new Applicant<>(function, future));
-        return future;
+        CompletableFuture<R> appliedFuture = completableFuture.thenApply(value 
-> {
+            try {
+                return function.apply(value);
+            } catch (Throwable t) {
+                if (t instanceof CompletionException) {
+                    // KafkaFuture#thenApply, when the function threw 
CompletionException should return
+                    // an ExecutionException wrapping a CompletionException 
wrapping the exception thrown by the
+                    // function. CompletableFuture#thenApply will just return 
ExecutionException wrapping the
+                    // exception thrown by the function, so we add an extra 
CompletionException here to
+                    // maintain the KafkaFuture behaviour.
+                    throw new CompletionException(t);
+                } else {
+                    throw t;
+                }
+            }
+        });
+        return new KafkaFutureImpl<>(true, 
toKafkaCompletableFuture(appliedFuture));
     }
 
-    public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T> 
function) {
-        KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future;
-        futureImpl.addWaiter(new Applicant<>(function, this));
+    private static <U> KafkaCompletableFuture<U> 
toKafkaCompletableFuture(CompletableFuture<U> completableFuture) {
+        if (completableFuture instanceof KafkaCompletableFuture) {
+            return (KafkaCompletableFuture<U>) completableFuture;
+        } else {
+            final KafkaCompletableFuture<U> result = new 
KafkaCompletableFuture<>();
+            completableFuture.whenComplete((x, y) -> {
+                if (y != null) {
+                    result.kafkaCompleteExceptionally(y);
+                } else {
+                    result.kafkaComplete(x);
+                }
+            });
+            return result;
+        }
     }
 
     /**
      * @see KafkaFutureImpl#thenApply(BaseFunction)
+     * @deprecated Since Kafka 3.0.
      */
+    @Deprecated
     @Override
     public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
         return thenApply((BaseFunction<T, R>) function);
     }
 
-    private static class WhenCompleteBiConsumer<T> implements BiConsumer<T, 
Throwable> {
-        private final KafkaFutureImpl<T> future;
-        private final BiConsumer<? super T, ? super Throwable> biConsumer;
-
-        WhenCompleteBiConsumer(KafkaFutureImpl<T> future, BiConsumer<? super 
T, ? super Throwable> biConsumer) {
-            this.future = future;
-            this.biConsumer = biConsumer;
-        }
-
-        @Override
-        public void accept(T val, Throwable exception) {
+    @Override
+    public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super 
Throwable> biConsumer) {
+        CompletableFuture<T> tCompletableFuture = 
completableFuture.whenComplete((java.util.function.BiConsumer<? super T, ? 
super Throwable>) (a, b) -> {
             try {
-                if (exception != null) {
-                    biConsumer.accept(null, exception);
+                biConsumer.accept(a, b);
+            } catch (Throwable t) {
+                if (t instanceof CompletionException) {
+                    throw new CompletionException(t);
                 } else {
-                    biConsumer.accept(val, null);
-                }
-            } catch (Throwable e) {
-                if (exception == null) {
-                    exception = e;
+                    throw t;
                 }
             }
-            if (exception != null) {
-                future.completeExceptionally(exception);
-            } else {
-                future.complete(val);
-            }
-        }
+        });
+        return new KafkaFutureImpl<>(true, 
toKafkaCompletableFuture(tCompletableFuture));
     }
 
-    @Override
-    public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super 
Throwable> biConsumer) {
-        final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
-        addWaiter(new WhenCompleteBiConsumer<>(future, biConsumer));
-        return future;
-    }
-
-    protected synchronized void addWaiter(BiConsumer<? super T, ? super 
Throwable> action) {
-        if (exception != null) {
-            action.accept(null, exception);
-        } else if (done) {
-            action.accept(value, null);
-        } else {
-            waiters.add(action);
-        }
-    }
 
     @Override
-    public synchronized boolean complete(T newValue) {
-        List<BiConsumer<? super T, ? super Throwable>> oldWaiters;
-        synchronized (this) {
-            if (done)
-                return false;
-            value = newValue;
-            done = true;
-            oldWaiters = waiters;
-            waiters = null;
-        }
-        for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
-            waiter.accept(newValue, null);
-        }
-        return true;
+    public boolean complete(T newValue) {
+        return completableFuture.kafkaComplete(newValue);
     }
 
     @Override
     public boolean completeExceptionally(Throwable newException) {
-        List<BiConsumer<? super T, ? super Throwable>> oldWaiters;
-        synchronized (this) {
-            if (done)
-                return false;
-            exception = newException;
-            done = true;
-            oldWaiters = waiters;
-            waiters = null;
-        }
-        for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
-            waiter.accept(null, newException);
-        }
-        return true;
+        // CompletableFuture#get() always wraps the _cause_ of a 
CompletionException in ExecutionException
+        // (which KafkaFuture does not) so wrap CompletionException in an 
extra one to avoid losing the
+        // first CompletionException in the exception chain.
+        return completableFuture.kafkaCompleteExceptionally(
+                newException instanceof CompletionException ? new 
CompletionException(newException) : newException);
     }
 
     /**
@@ -246,8 +137,23 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
      * CompletionException caused by this CancellationException.
      */
     @Override
-    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
-        return completeExceptionally(new CancellationException()) || exception 
instanceof CancellationException;
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return completableFuture.cancel(mayInterruptIfRunning);
+    }
+
+    /**
+     * We need to deal with differences between KafkaFuture's historic API and 
the API of CompletableFuture:
+     * CompletableFuture#get() does not wrap CancellationException in 
ExecutionException (nor does KafkaFuture).
+     * CompletableFuture#get() always wraps the _cause_ of a 
CompletionException in ExecutionException
+     * (which KafkaFuture does not).
+     *
+     * The semantics for KafkaFuture are that all exceptional completions of 
the future (via #completeExceptionally()
+     * or exceptions from dependants) manifest as ExecutionException, as 
observed via both get() and getNow().
+     */
+    private void maybeThrowCancellationException(Throwable cause) {
+        if (cause instanceof CancellationException) {
+            throw (CancellationException) cause;
+        }
     }
 
     /**
@@ -255,9 +161,12 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
      */
     @Override
     public T get() throws InterruptedException, ExecutionException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await();
+        try {
+            return completableFuture.get();
+        } catch (ExecutionException e) {
+            maybeThrowCancellationException(e.getCause());
+            throw e;
+        }
     }
 
     /**
@@ -267,9 +176,12 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        try {
+            return completableFuture.get(timeout, unit);
+        } catch (ExecutionException e) {
+            maybeThrowCancellationException(e.getCause());
+            throw e;
+        }
     }
 
     /**
@@ -277,40 +189,69 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {
-        if (exception != null)
-            wrapAndThrow(exception);
-        if (done)
-            return value;
-        return valueIfAbsent;
+    public T getNow(T valueIfAbsent) throws ExecutionException {
+        try {
+            return completableFuture.getNow(valueIfAbsent);
+        } catch (CompletionException e) {
+            maybeThrowCancellationException(e.getCause());
+            // Note, unlike CompletableFuture#get() which throws 
ExecutionException, CompletableFuture#getNow()
+            // throws CompletionException, thus needs rewrapping to conform to 
KafkaFuture API,
+            // where KafkaFuture#getNow() throws ExecutionException.
+            throw new ExecutionException(e.getCause());
+        }
     }
 
     /**
      * Returns true if this CompletableFuture was cancelled before it 
completed normally.
      */
     @Override
-    public synchronized boolean isCancelled() {
-        return exception instanceof CancellationException;
+    public boolean isCancelled() {
+        if (isDependant) {
+            // Having isCancelled() for a dependent future just return
+            // CompletableFuture.isCancelled() would break the historical 
KafkaFuture behaviour because
+            // CompletableFuture#isCancelled() just checks for the exception 
being CancellationException
+            // whereas it will be a CompletionException wrapping a 
CancellationException
+            // due needing to compensate for CompletableFuture's 
CompletionException unwrapping
+            // shenanigans in other methods.
+            try {
+                completableFuture.getNow(null);
+                return false;
+            } catch (Exception e) {
+                return e instanceof CompletionException
+                        && e.getCause() instanceof CancellationException;
+            }
+        } else {
+            return completableFuture.isCancelled();
+        }
     }
 
     /**
      * Returns true if this CompletableFuture completed exceptionally, in any 
way.
      */
     @Override
-    public synchronized boolean isCompletedExceptionally() {
-        return exception != null;
+    public boolean isCompletedExceptionally() {
+        return completableFuture.isCompletedExceptionally();
     }
 
     /**
      * Returns true if completed in any fashion: normally, exceptionally, or 
via cancellation.
      */
     @Override
-    public synchronized boolean isDone() {
-        return done;
+    public boolean isDone() {
+        return completableFuture.isDone();
     }
 
     @Override
     public String toString() {
-        return String.format("KafkaFuture{value=%s,exception=%s,done=%b}", 
value, exception, done);
+        T value = null;
+        Throwable exception = null;
+        try {
+            value = completableFuture.getNow(null);
+        } catch (CompletionException e) {
+            exception = e.getCause();
+        } catch (Exception e) {
+            exception = e;
+        }
+        return String.format("KafkaFuture{value=%s,exception=%s,done=%b}", 
value, exception, exception != null || value != null);
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java 
b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index 6fc91ea..0218ce1 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -17,15 +17,24 @@
 package org.apache.kafka.common;
 
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Java;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -38,47 +47,197 @@ import static 
org.junit.jupiter.api.Assertions.assertEquals;
 @Timeout(120)
 public class KafkaFutureTest {
 
+    /** Asserts that the given future is done, didn't fail and wasn't 
cancelled. */
+    private void assertIsSuccessful(KafkaFuture<?> future) {
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertFalse(future.isCancelled());
+    }
+
+    /** Asserts that the given future is done, failed and wasn't cancelled. */
+    private void assertIsFailed(KafkaFuture<?> future) {
+        assertTrue(future.isDone());
+        assertFalse(future.isCancelled());
+        assertTrue(future.isCompletedExceptionally());
+    }
+
+    /** Asserts that the given future is done, didn't fail and was cancelled. 
*/
+    private void assertIsCancelled(KafkaFuture<?> future) {
+        assertTrue(future.isDone());
+        assertTrue(future.isCancelled());
+        assertTrue(future.isCompletedExceptionally());
+        assertThrows(CancellationException.class, () -> future.getNow(null));
+        assertThrows(CancellationException.class, () -> future.get(0, 
TimeUnit.MILLISECONDS));
+    }
+
+    private <T> void awaitAndAssertResult(KafkaFuture<T> future,
+                                          T expectedResult,
+                                          T alternativeValue) {
+        assertNotEquals(expectedResult, alternativeValue);
+        try {
+            assertEquals(expectedResult, future.get(5, TimeUnit.MINUTES));
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            assertEquals(expectedResult, future.get());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            assertEquals(expectedResult, future.getNow(alternativeValue));
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+    }
+
+    private Throwable awaitAndAssertFailure(KafkaFuture<?> future,
+                                            Class<? extends Throwable> 
expectedException,
+                                            String expectedMessage) {
+        ExecutionException executionException = 
assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.MINUTES));
+        assertEquals(expectedException, 
executionException.getCause().getClass());
+        assertEquals(expectedMessage, 
executionException.getCause().getMessage());
+
+        executionException = assertThrows(ExecutionException.class, () -> 
future.get());
+        assertEquals(expectedException, 
executionException.getCause().getClass());
+        assertEquals(expectedMessage, 
executionException.getCause().getMessage());
+
+        executionException = assertThrows(ExecutionException.class, () -> 
future.getNow(null));
+        assertEquals(expectedException, 
executionException.getCause().getClass());
+        assertEquals(expectedMessage, 
executionException.getCause().getMessage());
+        return executionException.getCause();
+    }
+
+    private void awaitAndAssertCancelled(KafkaFuture<?> future, String 
expectedMessage) {
+        CancellationException cancellationException = 
assertThrows(CancellationException.class, () -> future.get(5, 
TimeUnit.MINUTES));
+        assertEquals(expectedMessage, cancellationException.getMessage());
+        assertEquals(CancellationException.class, 
cancellationException.getClass());
+
+        cancellationException = assertThrows(CancellationException.class, () 
-> future.get());
+        assertEquals(expectedMessage, cancellationException.getMessage());
+        assertEquals(CancellationException.class, 
cancellationException.getClass());
+
+        cancellationException = assertThrows(CancellationException.class, () 
-> future.getNow(null));
+        assertEquals(expectedMessage, cancellationException.getMessage());
+        assertEquals(CancellationException.class, 
cancellationException.getClass());
+    }
+
     @Test
     public void testCompleteFutures() throws Exception {
         KafkaFutureImpl<Integer> future123 = new KafkaFutureImpl<>();
         assertTrue(future123.complete(123));
-        assertEquals(Integer.valueOf(123), future123.get());
         assertFalse(future123.complete(456));
-        assertTrue(future123.isDone());
-        assertFalse(future123.isCancelled());
-        assertFalse(future123.isCompletedExceptionally());
+        assertFalse(future123.cancel(true));
+        assertEquals(Integer.valueOf(123), future123.get());
+        assertIsSuccessful(future123);
 
         KafkaFuture<Integer> future456 = KafkaFuture.completedFuture(456);
+        assertFalse(future456.complete(789));
+        assertFalse(future456.cancel(true));
         assertEquals(Integer.valueOf(456), future456.get());
+        assertIsSuccessful(future456);
+    }
 
+    @Test
+    public void testCompleteFuturesExceptionally() throws Exception {
         KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
-        futureFail.completeExceptionally(new RuntimeException("We require more 
vespene gas"));
-        ExecutionException e = assertThrows(ExecutionException.class, 
futureFail::get);
-        assertEquals(RuntimeException.class, e.getCause().getClass());
-        assertEquals("We require more vespene gas", e.getCause().getMessage());
+        assertTrue(futureFail.completeExceptionally(new RuntimeException("We 
require more vespene gas")));
+        assertIsFailed(futureFail);
+        assertFalse(futureFail.completeExceptionally(new RuntimeException("We 
require more minerals")));
+        assertFalse(futureFail.cancel(true));
+
+        ExecutionException executionException = 
assertThrows(ExecutionException.class, () -> futureFail.get());
+        assertEquals(RuntimeException.class, 
executionException.getCause().getClass());
+        assertEquals("We require more vespene gas", 
executionException.getCause().getMessage());
+
+        KafkaFutureImpl<Integer> tricky1 = new KafkaFutureImpl<>();
+        assertTrue(tricky1.completeExceptionally(new CompletionException(new 
CancellationException())));
+        assertIsFailed(tricky1);
+        awaitAndAssertFailure(tricky1, CompletionException.class, 
"java.util.concurrent.CancellationException");
+    }
+
+    @Test
+    public void testCompleteFuturesViaCancellation() {
+        KafkaFutureImpl<Integer> viaCancel = new KafkaFutureImpl<>();
+        assertTrue(viaCancel.cancel(true));
+        assertIsCancelled(viaCancel);
+        awaitAndAssertCancelled(viaCancel, null);
+
+        KafkaFutureImpl<Integer> viaCancellationException = new 
KafkaFutureImpl<>();
+        assertTrue(viaCancellationException.completeExceptionally(new 
CancellationException("We require more vespene gas")));
+        assertIsCancelled(viaCancellationException);
+        awaitAndAssertCancelled(viaCancellationException, "We require more 
vespene gas");
+    }
+
+    @Test
+    public void testToString() {
+        KafkaFutureImpl<Integer> success = new KafkaFutureImpl<>();
+        assertEquals("KafkaFuture{value=null,exception=null,done=false}", 
success.toString());
+        success.complete(12);
+        assertEquals("KafkaFuture{value=12,exception=null,done=true}", 
success.toString());
+
+        KafkaFutureImpl<Integer> failure = new KafkaFutureImpl<>();
+        failure.completeExceptionally(new RuntimeException("foo"));
+        
assertEquals("KafkaFuture{value=null,exception=java.lang.RuntimeException: 
foo,done=true}", failure.toString());
+
+        KafkaFutureImpl<Integer> tricky1 = new KafkaFutureImpl<>();
+        tricky1.completeExceptionally(new CompletionException(new 
CancellationException()));
+        
assertEquals("KafkaFuture{value=null,exception=java.util.concurrent.CompletionException:
 java.util.concurrent.CancellationException,done=true}", tricky1.toString());
+
+        KafkaFutureImpl<Integer> cancelled = new KafkaFutureImpl<>();
+        cancelled.cancel(true);
+        
assertEquals("KafkaFuture{value=null,exception=java.util.concurrent.CancellationException,done=true}",
 cancelled.toString());
     }
 
     @Test
     public void testCompletingFutures() throws Exception {
         final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
         CompleterThread<String> myThread = new CompleterThread<>(future, "You 
must construct additional pylons.");
-        assertFalse(future.isDone());
-        assertFalse(future.isCompletedExceptionally());
-        assertFalse(future.isCancelled());
+        assertIsNotCompleted(future);
         assertEquals("I am ready", future.getNow("I am ready"));
         myThread.start();
-        String str = future.get(5, TimeUnit.MINUTES);
-        assertEquals("You must construct additional pylons.", str);
-        assertEquals("You must construct additional pylons.", future.getNow("I 
am ready"));
-        assertTrue(future.isDone());
-        assertFalse(future.isCompletedExceptionally());
-        assertFalse(future.isCancelled());
+        awaitAndAssertResult(future, "You must construct additional pylons.", 
"I am ready");
+        assertIsSuccessful(future);
+        myThread.join();
+        assertNull(myThread.testException);
+    }
+
+    @Test
+    public void testCompletingFuturesExceptionally() throws Exception {
+        final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        CompleterThread<String> myThread = new CompleterThread<>(future, null,
+                new RuntimeException("Ultimate efficiency achieved."));
+        assertIsNotCompleted(future);
+        assertEquals("I am ready", future.getNow("I am ready"));
+        myThread.start();
+        awaitAndAssertFailure(future, RuntimeException.class, "Ultimate 
efficiency achieved.");
+        assertIsFailed(future);
         myThread.join();
         assertNull(myThread.testException);
     }
 
     @Test
-    public void testThenApply() throws Exception {
+    public void testCompletingFuturesViaCancellation() throws Exception {
+        final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        CompleterThread<String> myThread = new CompleterThread<>(future, null,
+                new CancellationException("Ultimate efficiency achieved."));
+        assertIsNotCompleted(future);
+        assertEquals("I am ready", future.getNow("I am ready"));
+        myThread.start();
+        awaitAndAssertCancelled(future, "Ultimate efficiency achieved.");
+        assertIsCancelled(future);
+        myThread.join();
+        assertNull(myThread.testException);
+    }
+
+    private void assertIsNotCompleted(KafkaFutureImpl<String> future) {
+        assertFalse(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertFalse(future.isCancelled());
+    }
+
+    @Test
+    public void testThenApplyOnSucceededFuture() throws Exception {
         KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
         KafkaFuture<Integer> doubledFuture = future.thenApply(integer -> 2 * 
integer);
         assertFalse(doubledFuture.isDone());
@@ -90,23 +249,207 @@ public class KafkaFutureTest {
         assertEquals(Integer.valueOf(63), tripledFuture.getNow(-1));
         KafkaFuture<Integer> quadrupledFuture = future.thenApply(integer -> 4 
* integer);
         assertEquals(Integer.valueOf(84), quadrupledFuture.getNow(-1));
+    }
 
-        KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
-        KafkaFuture<Integer> futureAppliedFail = futureFail.thenApply(integer 
-> 2 * integer);
-        futureFail.completeExceptionally(new RuntimeException());
-        assertTrue(futureFail.isCompletedExceptionally());
-        assertTrue(futureAppliedFail.isCompletedExceptionally());
+    @Test
+    public void testThenApplyOnFailedFuture() {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 * 
integer);
+        future.completeExceptionally(new RuntimeException("We require more 
vespene gas"));
+        assertIsFailed(future);
+        assertIsFailed(dependantFuture);
+        awaitAndAssertFailure(future, RuntimeException.class, "We require more 
vespene gas");
+        awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We 
require more vespene gas");
+    }
+
+    @Test
+    public void testThenApplyOnFailedFutureTricky() {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 * 
integer);
+        future.completeExceptionally(new CompletionException(new 
RuntimeException("We require more vespene gas")));
+        assertIsFailed(future);
+        assertIsFailed(dependantFuture);
+        awaitAndAssertFailure(future, CompletionException.class, 
"java.lang.RuntimeException: We require more vespene gas");
+        awaitAndAssertFailure(dependantFuture, CompletionException.class, 
"java.lang.RuntimeException: We require more vespene gas");
+    }
+
+    @Test
+    public void testThenApplyOnFailedFutureTricky2() {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 * 
integer);
+        future.completeExceptionally(new CompletionException(new 
CancellationException()));
+        assertIsFailed(future);
+        assertIsFailed(dependantFuture);
+        awaitAndAssertFailure(future, CompletionException.class, 
"java.util.concurrent.CancellationException");
+        awaitAndAssertFailure(dependantFuture, CompletionException.class, 
"java.util.concurrent.CancellationException");
+    }
+
+    @Test
+    public void testThenApplyOnSucceededFutureAndFunctionThrows() {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> {
+            throw new RuntimeException("We require more vespene gas");
+        });
+        future.complete(21);
+        assertIsSuccessful(future);
+        assertIsFailed(dependantFuture);
+        awaitAndAssertResult(future, 21, null);
+        awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We 
require more vespene gas");
+    }
+
+    @Test
+    public void 
testThenApplyOnSucceededFutureAndFunctionThrowsCompletionException() {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> {
+            throw new CompletionException(new RuntimeException("We require 
more vespene gas"));
+        });
+        future.complete(21);
+        assertIsSuccessful(future);
+        assertIsFailed(dependantFuture);
+        awaitAndAssertResult(future, 21, null);
+        Throwable cause = awaitAndAssertFailure(dependantFuture, 
CompletionException.class, "java.lang.RuntimeException: We require more vespene 
gas");
+        assertTrue(cause.getCause() instanceof RuntimeException);
+        assertEquals(cause.getCause().getMessage(), "We require more vespene 
gas");
+    }
+
+    @Test
+    public void testThenApplyOnFailedFutureFunctionNotCalled() {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        boolean[] ran = {false};
+        KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> {
+            // Because the top level future failed, this should never be 
called.
+            ran[0] = true;
+            return null;
+        });
+        future.completeExceptionally(new RuntimeException("We require more 
minerals"));
+        assertIsFailed(future);
+        assertIsFailed(dependantFuture);
+        awaitAndAssertFailure(future, RuntimeException.class, "We require more 
minerals");
+        awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We 
require more minerals");
+        assertFalse(ran[0]);
+    }
+
+    @Test
+    public void testThenApplyOnCancelledFuture() {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 * 
integer);
+        future.cancel(true);
+        assertIsCancelled(future);
+        assertIsCancelled(dependantFuture);
+        awaitAndAssertCancelled(future, null);
+        awaitAndAssertCancelled(dependantFuture, null);
+    }
+
+    @Test
+    public void testWhenCompleteOnSucceededFuture() throws Throwable {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        Throwable[] err = new Throwable[1];
+        boolean[] ran = {false};
+        KafkaFuture<Integer> dependantFuture = future.whenComplete((integer, 
ex) -> {
+            ran[0] = true;
+            try {
+                assertEquals(Integer.valueOf(21), integer);
+                if (ex != null) {
+                    throw ex;
+                }
+            } catch (Throwable e) {
+                err[0] = e;
+            }
+        });
+        assertFalse(dependantFuture.isDone());
+        assertTrue(future.complete(21));
+        assertTrue(ran[0]);
+        if (err[0] != null) {
+            throw err[0];
+        }
+    }
+
+    @Test
+    public void testWhenCompleteOnFailedFuture() {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        Throwable[] err = new Throwable[1];
+        boolean[] ran = {false};
+        KafkaFuture<Integer> dependantFuture = future.whenComplete((integer, 
ex) -> {
+            ran[0] = true;
+            err[0] = ex;
+            if (integer != null) {
+                err[0] = new AssertionError();
+            }
+        });
+        assertFalse(dependantFuture.isDone());
+        RuntimeException ex = new RuntimeException("We require more vespene 
gas");
+        assertTrue(future.completeExceptionally(ex));
+        assertTrue(ran[0]);
+        assertEquals(err[0], ex);
+    }
+
+    @Test
+    public void testWhenCompleteOnSucceededFutureAndConsumerThrows() {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        boolean[] ran = {false};
+        KafkaFuture<Integer> dependantFuture = future.whenComplete((integer, 
ex) -> {
+            ran[0] = true;
+            throw new RuntimeException("We require more minerals");
+        });
+        assertFalse(dependantFuture.isDone());
+        assertTrue(future.complete(21));
+        assertIsSuccessful(future);
+        assertTrue(ran[0]);
+        assertIsFailed(dependantFuture);
+        awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We 
require more minerals");
+    }
+
+    @Test
+    public void testWhenCompleteOnFailedFutureAndConsumerThrows() {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        boolean[] ran = {false};
+        KafkaFuture<Integer> dependantFuture = future.whenComplete((integer, 
ex) -> {
+            ran[0] = true;
+            throw new RuntimeException("We require more minerals");
+        });
+        assertFalse(dependantFuture.isDone());
+        assertTrue(future.completeExceptionally(new RuntimeException("We 
require more vespene gas")));
+        assertIsFailed(future);
+        assertTrue(ran[0]);
+        assertIsFailed(dependantFuture);
+        awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We 
require more vespene gas");
+    }
+
+    @Test
+    public void testWhenCompleteOnCancelledFuture() {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        Throwable[] err = new Throwable[1];
+        boolean[] ran = {false};
+        KafkaFuture<Integer> dependantFuture = future.whenComplete((integer, 
ex) -> {
+            ran[0] = true;
+            err[0] = ex;
+            if (integer != null) {
+                err[0] = new AssertionError();
+            }
+        });
+        assertFalse(dependantFuture.isDone());
+        assertTrue(future.cancel(true));
+        assertTrue(ran[0]);
+        assertTrue(err[0] instanceof CancellationException);
     }
 
     private static class CompleterThread<T> extends Thread {
 
         private final KafkaFutureImpl<T> future;
         private final T value;
+        private final Throwable exception;
         Throwable testException = null;
 
         CompleterThread(KafkaFutureImpl<T> future, T value) {
             this.future = future;
             this.value = value;
+            this.exception = null;
+        }
+
+        CompleterThread(KafkaFutureImpl<T> future, T value, Exception 
exception) {
+            this.future = future;
+            this.value = value;
+            this.exception = exception;
         }
 
         @Override
@@ -116,7 +459,11 @@ public class KafkaFutureTest {
                     Thread.sleep(0, 200);
                 } catch (InterruptedException e) {
                 }
-                future.complete(value);
+                if (exception == null) {
+                    future.complete(value);
+                } else {
+                    future.completeExceptionally(exception);
+                }
             } catch (Throwable testException) {
                 this.testException = testException;
             }
@@ -153,8 +500,8 @@ public class KafkaFutureTest {
             futures.add(new KafkaFutureImpl<>());
         }
         KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new 
KafkaFuture[0]));
-        final List<CompleterThread> completerThreads = new ArrayList<>();
-        final List<WaiterThread> waiterThreads = new ArrayList<>();
+        final List<CompleterThread<Integer>> completerThreads = new 
ArrayList<>();
+        final List<WaiterThread<Integer>> waiterThreads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
             completerThreads.add(new CompleterThread<>(futures.get(i), i));
             waiterThreads.add(new WaiterThread<>(futures.get(i), i));
@@ -169,7 +516,7 @@ public class KafkaFutureTest {
         assertFalse(allFuture.isDone());
         completerThreads.get(numThreads - 1).start();
         allFuture.get();
-        assertTrue(allFuture.isDone());
+        assertIsSuccessful(allFuture);
         for (int i = 0; i < numThreads; i++) {
             assertEquals(Integer.valueOf(i), futures.get(i).get());
         }
@@ -182,6 +529,52 @@ public class KafkaFutureTest {
     }
 
     @Test
+    public void testAllOfFuturesWithFailure() throws Exception {
+        final int numThreads = 5;
+        final List<KafkaFutureImpl<Integer>> futures = new ArrayList<>();
+        for (int i = 0; i < numThreads; i++) {
+            futures.add(new KafkaFutureImpl<>());
+        }
+        KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new 
KafkaFuture[0]));
+        final List<CompleterThread<Integer>> completerThreads = new 
ArrayList<>();
+        final List<WaiterThread<Integer>> waiterThreads = new ArrayList<>();
+        int lastIndex = numThreads - 1;
+        for (int i = 0; i < lastIndex; i++) {
+            completerThreads.add(new CompleterThread<>(futures.get(i), i));
+            waiterThreads.add(new WaiterThread<>(futures.get(i), i));
+        }
+        completerThreads.add(new CompleterThread<>(futures.get(lastIndex), 
null, new RuntimeException("Last one failed")));
+        waiterThreads.add(new WaiterThread<>(futures.get(lastIndex), 
lastIndex));
+        assertFalse(allFuture.isDone());
+        for (int i = 0; i < numThreads; i++) {
+            waiterThreads.get(i).start();
+        }
+        for (int i = 0; i < lastIndex; i++) {
+            completerThreads.get(i).start();
+        }
+        assertFalse(allFuture.isDone());
+        completerThreads.get(lastIndex).start();
+        awaitAndAssertFailure(allFuture, RuntimeException.class, "Last one 
failed");
+        assertIsFailed(allFuture);
+        for (int i = 0; i < lastIndex; i++) {
+            assertEquals(Integer.valueOf(i), futures.get(i).get());
+        }
+        assertIsFailed(futures.get(lastIndex));
+        for (int i = 0; i < numThreads; i++) {
+            completerThreads.get(i).join();
+            waiterThreads.get(i).join();
+            assertNull(completerThreads.get(i).testException);
+            if (i == lastIndex) {
+                assertEquals(ExecutionException.class, 
waiterThreads.get(i).testException.getClass());
+                assertEquals(RuntimeException.class, 
waiterThreads.get(i).testException.getCause().getClass());
+                assertEquals("Last one failed", 
waiterThreads.get(i).testException.getCause().getMessage());
+            } else {
+                assertNull(waiterThreads.get(i).testException);
+            }
+        }
+    }
+
+    @Test
     public void testAllOfFuturesHandlesZeroFutures() throws Exception {
         KafkaFuture<Void> allFuture = KafkaFuture.allOf();
         assertTrue(allFuture.isDone());
@@ -196,4 +589,60 @@ public class KafkaFutureTest {
         assertThrows(TimeoutException.class, () -> future.get(0, 
TimeUnit.MILLISECONDS));
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testLeakCompletableFuture() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
+        final KafkaFutureImpl<String> kfut = new KafkaFutureImpl<>();
+        CompletableFuture<String> comfut = 
kfut.toCompletionStage().toCompletableFuture();
+        assertThrows(UnsupportedOperationException.class, () -> 
comfut.complete(""));
+        assertThrows(UnsupportedOperationException.class, () -> 
comfut.completeExceptionally(new RuntimeException()));
+        // Annoyingly CompletableFuture added some more methods in Java 9, but 
the tests need to run on Java 8
+        // so test reflectively
+        if (Java.IS_JAVA9_COMPATIBLE) {
+            Method completeOnTimeout = 
CompletableFuture.class.getDeclaredMethod("completeOnTimeout", Object.class, 
Long.TYPE, TimeUnit.class);
+            assertThrows(UnsupportedOperationException.class, () -> {
+                try {
+                    completeOnTimeout.invoke(comfut, "", 1L, 
TimeUnit.MILLISECONDS);
+                } catch (InvocationTargetException e) {
+                    throw e.getCause();
+                }
+            });
+
+            Method completeAsync = 
CompletableFuture.class.getDeclaredMethod("completeAsync", Supplier.class);
+            assertThrows(UnsupportedOperationException.class, () -> {
+                try {
+                    completeAsync.invoke(comfut, (Supplier<String>) () -> "");
+                } catch (InvocationTargetException e) {
+                    throw e.getCause();
+                }
+            });
+
+            Method obtrudeValue = 
CompletableFuture.class.getDeclaredMethod("obtrudeValue", Object.class);
+            assertThrows(UnsupportedOperationException.class, () -> {
+                try {
+                    obtrudeValue.invoke(comfut, "");
+                } catch (InvocationTargetException e) {
+                    throw e.getCause();
+                }
+            });
+
+            Method obtrudeException = 
CompletableFuture.class.getDeclaredMethod("obtrudeException", Throwable.class);
+            assertThrows(UnsupportedOperationException.class, () -> {
+                try {
+                    obtrudeException.invoke(comfut, new RuntimeException());
+                } catch (InvocationTargetException e) {
+                    throw e.getCause();
+                }
+            });
+
+            // Check the CF from a minimal CompletionStage doesn't cause 
completion of the original KafkaFuture
+            Method minimal = 
CompletableFuture.class.getDeclaredMethod("minimalCompletionStage");
+            CompletionStage<String> cs = (CompletionStage<String>) 
minimal.invoke(comfut);
+            cs.toCompletableFuture().complete("");
+
+            assertFalse(kfut.isDone());
+            assertFalse(comfut.isDone());
+        }
+    }
+
 }
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index b86226c..878cd01 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -99,6 +99,12 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
         <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"/>
     </Match>
 
+    <!-- false positive, see https://github.com/spotbugs/spotbugs/issues/1001 
-->
+    <Match>
+        <Class name="org.apache.kafka.common.internals.KafkaFutureImpl"/>
+        <Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
+    </Match>
+
     <Match>
         <!-- Suppression for the equals() for extension methods. -->
         <Class name="kafka.api.package$ElectLeadersRequestOps"/>

Reply via email to