This is an automated email from the ASF dual-hosted git repository.
tombentley pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 19e8fd5 KAFKA-9687: KIP-707: Add KafkaFuture.toCompletionStage()
(#9878)
19e8fd5 is described below
commit 19e8fd513a15b595f940370be07215863ad1ff7b
Author: Tom Bentley <[email protected]>
AuthorDate: Mon Jul 5 17:33:33 2021 +0100
KAFKA-9687: KIP-707: Add KafkaFuture.toCompletionStage() (#9878)
* Improve the test prior to reimplementing KafkaFutureImpl using
CompletableFuture.
* KAFKA-9687: Reimplement KafkaFutureImpl using a CompleteableFuture
* KIP-707: Add KafkaFuture.toCompletionStage
Reviewers: Chia-Ping Tsai <[email protected]>, David Jacot
<[email protected]>, Konstantine Karantasis <[email protected]>
---
.../java/org/apache/kafka/common/KafkaFuture.java | 98 ++--
.../common/internals/KafkaCompletableFuture.java | 95 ++++
.../kafka/common/internals/KafkaFutureImpl.java | 329 ++++++--------
.../org/apache/kafka/common/KafkaFutureTest.java | 503 +++++++++++++++++++--
gradle/spotbugs-exclude.xml | 6 +
5 files changed, 766 insertions(+), 265 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 9cd2e01..84aed74 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -16,25 +16,33 @@
*/
package org.apache.kafka.common;
-import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
- * A flexible future which supports call chaining and other asynchronous
programming patterns. This will
- * eventually become a thin shim on top of Java 8's CompletableFuture.
+ * A flexible future which supports call chaining and other asynchronous
programming patterns.
*
- * The API for this class is still evolving and we may break compatibility in
minor releases, if necessary.
+ * <h3>Relation to {@code CompletionStage}</h3>
+ * <p>It is possible to obtain a {@code CompletionStage} from a
+ * {@code KafkaFuture} instance by calling {@link #toCompletionStage()}.
+ * If converting {@link KafkaFuture#whenComplete(BiConsumer)} or {@link
KafkaFuture#thenApply(BaseFunction)} to
+ * {@link CompletableFuture#whenComplete(java.util.function.BiConsumer)} or
+ * {@link CompletableFuture#thenApply(java.util.function.Function)} be aware
that the returned
+ * {@code KafkaFuture} will fail with an {@code ExecutionException}, whereas a
{@code CompletionStage} fails
+ * with a {@code CompletionException}.
*/
[email protected]
public abstract class KafkaFuture<T> implements Future<T> {
/**
* A function which takes objects of type A and returns objects of type B.
*/
+ @FunctionalInterface
public interface BaseFunction<A, B> {
B apply(A a);
}
@@ -42,52 +50,24 @@ public abstract class KafkaFuture<T> implements Future<T> {
/**
* A function which takes objects of type A and returns objects of type B.
*
- * Prefer the functional interface {@link BaseFunction} over the class
{@link Function}. This class is here for
- * backwards compatibility reasons and might be deprecated/removed in a
future release.
+ * @deprecated Since Kafka 3.0. Use the {@link BaseFunction} functional
interface.
*/
+ @Deprecated
public static abstract class Function<A, B> implements BaseFunction<A, B>
{ }
/**
* A consumer of two different types of object.
*/
+ @FunctionalInterface
public interface BiConsumer<A, B> {
void accept(A a, B b);
}
- private static class AllOfAdapter<R> implements BiConsumer<R, Throwable> {
- private int remainingResponses;
- private KafkaFuture<?> future;
-
- public AllOfAdapter(int remainingResponses, KafkaFuture<?> future) {
- this.remainingResponses = remainingResponses;
- this.future = future;
- maybeComplete();
- }
-
- @Override
- public synchronized void accept(R newValue, Throwable exception) {
- if (remainingResponses <= 0)
- return;
- if (exception != null) {
- remainingResponses = 0;
- future.completeExceptionally(exception);
- } else {
- remainingResponses--;
- maybeComplete();
- }
- }
-
- private void maybeComplete() {
- if (remainingResponses <= 0)
- future.complete(null);
- }
- }
-
/**
* Returns a new KafkaFuture that is already completed with the given
value.
*/
public static <U> KafkaFuture<U> completedFuture(U value) {
- KafkaFuture<U> future = new KafkaFutureImpl<U>();
+ KafkaFuture<U> future = new KafkaFutureImpl<>();
future.complete(value);
return future;
}
@@ -98,15 +78,46 @@ public abstract class KafkaFuture<T> implements Future<T> {
* an exception, which one gets returned is arbitrarily chosen.
*/
public static KafkaFuture<Void> allOf(KafkaFuture<?>... futures) {
- KafkaFuture<Void> allOfFuture = new KafkaFutureImpl<>();
- AllOfAdapter<Object> allOfWaiter = new AllOfAdapter<>(futures.length,
allOfFuture);
- for (KafkaFuture<?> future : futures) {
- future.addWaiter(allOfWaiter);
- }
- return allOfFuture;
+ KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+ CompletableFuture.allOf(Arrays.stream(futures)
+ .map(kafkaFuture -> {
+ // Safe since KafkaFuture's only subclass is KafkaFuture
for which toCompletionStage()
+ // always return a CF.
+ return (CompletableFuture<?>)
kafkaFuture.toCompletionStage();
+ })
+ .toArray(CompletableFuture[]::new)).whenComplete((value, ex)
-> {
+ if (ex == null) {
+ result.complete(value);
+ } else {
+ // Have to unwrap the CompletionException which
allOf() introduced
+ result.completeExceptionally(ex.getCause());
+ }
+ });
+
+ return result;
}
/**
+ * Gets a {@code CompletionStage} with the same completion properties as
this {@code KafkaFuture}.
+ * The returned instance will complete when this future completes and in
the same way
+ * (with the same result or exception).
+ *
+ * <p>Calling {@code toCompletableFuture()} on the returned instance will
yield a {@code CompletableFuture},
+ * but invocation of the completion methods ({@code complete()} and other
methods in the {@code complete*()}
+ * and {@code obtrude*()} families) on that {@code CompletableFuture}
instance will result in
+ * {@code UnsupportedOperationException} being thrown. Unlike a "minimal"
{@code CompletableFuture},
+ * the {@code get*()} and other methods of {@code CompletableFuture} that
are not inherited from
+ * {@code CompletionStage} will work normally.
+ *
+ * <p>If you want to block on the completion of a KafkaFuture you should
use
+ * {@link #get()}, {@link #get(long, TimeUnit)} or {@link
#getNow(Object)}, rather then calling
+ * {@code .toCompletionStage().toCompletableFuture().get()} etc.
+ *
+ * @since Kafka 3.0
+ */
+ public abstract CompletionStage<T> toCompletionStage();
+
+ /**
* Returns a new KafkaFuture that, when this future completes normally, is
executed with this
* futures's result as the argument to the supplied function.
*
@@ -145,7 +156,6 @@ public abstract class KafkaFuture<T> implements Future<T> {
*/
public abstract KafkaFuture<T> whenComplete(BiConsumer<? super T, ? super
Throwable> action);
- protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable>
action);
/**
* If not already completed, sets the value returned by get() and related
methods to the given
* value.
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaCompletableFuture.java
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaCompletableFuture.java
new file mode 100644
index 0000000..8c75ea4
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaCompletableFuture.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This internal class exists because CompletableFuture exposes complete(),
completeExceptionally() and
+ * other methods which would allow erroneous completion by user code of a
KafkaFuture returned from a
+ * Kafka API to a client application.
+ * @param <T> The type of the future value.
+ */
+public class KafkaCompletableFuture<T> extends CompletableFuture<T> {
+
+ /**
+ * Completes this future normally. For internal use by the Kafka clients,
not by user code.
+ * @param value the result value
+ * @return {@code true} if this invocation caused this CompletableFuture
+ * to transition to a completed state, else {@code false}
+ */
+ boolean kafkaComplete(T value) {
+ return super.complete(value);
+ }
+
+ /**
+ * Completes this future exceptionally. For internal use by the Kafka
clients, not by user code.
+ * @param throwable the exception.
+ * @return {@code true} if this invocation caused this CompletableFuture
+ * to transition to a completed state, else {@code false}
+ */
+ boolean kafkaCompleteExceptionally(Throwable throwable) {
+ return super.completeExceptionally(throwable);
+ }
+
+ @Override
+ public boolean complete(T value) {
+ throw erroneousCompletionException();
+ }
+
+ @Override
+ public boolean completeExceptionally(Throwable ex) {
+ throw erroneousCompletionException();
+ }
+
+ @Override
+ public void obtrudeValue(T value) {
+ throw erroneousCompletionException();
+ }
+
+ @Override
+ public void obtrudeException(Throwable ex) {
+ throw erroneousCompletionException();
+ }
+
+ //@Override // enable once Kafka no longer supports Java 8
+ public <U> CompletableFuture<U> newIncompleteFuture() {
+ return new KafkaCompletableFuture<>();
+ }
+
+ //@Override // enable once Kafka no longer supports Java 8
+ public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier,
Executor executor) {
+ throw erroneousCompletionException();
+ }
+
+ //@Override // enable once Kafka no longer supports Java 8
+ public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
+ throw erroneousCompletionException();
+ }
+
+ //@Override // enable once Kafka no longer supports Java 8
+ public CompletableFuture<T> completeOnTimeout(T value, long timeout,
TimeUnit unit) {
+ throw erroneousCompletionException();
+ }
+
+ private UnsupportedOperationException erroneousCompletionException() {
+ return new UnsupportedOperationException("User code should not
complete futures returned from Kafka clients");
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index c0babe1..711bd25 100644
---
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -16,9 +16,10 @@
*/
package org.apache.kafka.common.internals;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -27,113 +28,26 @@ import org.apache.kafka.common.KafkaFuture;
/**
* A flexible future which supports call chaining and other asynchronous
programming patterns.
- * This will eventually become a thin shim on top of Java 8's
CompletableFuture.
*/
public class KafkaFutureImpl<T> extends KafkaFuture<T> {
- /**
- * A convenience method that throws the current exception, wrapping it if
needed.
- *
- * In general, KafkaFuture throws CancellationException and
InterruptedException directly, and
- * wraps all other exceptions in an ExecutionException.
- */
- private static void wrapAndThrow(Throwable t) throws InterruptedException,
ExecutionException {
- if (t instanceof CancellationException) {
- throw (CancellationException) t;
- } else if (t instanceof InterruptedException) {
- throw (InterruptedException) t;
- } else {
- throw new ExecutionException(t);
- }
- }
- private static class Applicant<A, B> implements BiConsumer<A, Throwable> {
- private final BaseFunction<A, B> function;
- private final KafkaFutureImpl<B> future;
+ private final KafkaCompletableFuture<T> completableFuture;
- Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) {
- this.function = function;
- this.future = future;
- }
+ private final boolean isDependant;
- @Override
- public void accept(A a, Throwable exception) {
- if (exception != null) {
- future.completeExceptionally(exception);
- } else {
- try {
- B b = function.apply(a);
- future.complete(b);
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- }
- }
+ public KafkaFutureImpl() {
+ this(false, new KafkaCompletableFuture<>());
}
- private static class SingleWaiter<R> implements BiConsumer<R, Throwable> {
- private R value = null;
- private Throwable exception = null;
- private boolean done = false;
-
- @Override
- public synchronized void accept(R newValue, Throwable newException) {
- this.value = newValue;
- this.exception = newException;
- this.done = true;
- this.notifyAll();
- }
-
- synchronized R await() throws InterruptedException, ExecutionException
{
- while (true) {
- if (exception != null)
- wrapAndThrow(exception);
- if (done)
- return value;
- this.wait();
- }
- }
-
- R await(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException,
TimeoutException {
- long startMs = System.currentTimeMillis();
- long waitTimeMs = unit.toMillis(timeout);
- long delta = 0;
- synchronized (this) {
- while (true) {
- if (exception != null)
- wrapAndThrow(exception);
- if (done)
- return value;
- if (delta >= waitTimeMs) {
- throw new TimeoutException();
- }
- this.wait(waitTimeMs - delta);
- delta = System.currentTimeMillis() - startMs;
- }
- }
- }
+ private KafkaFutureImpl(boolean isDependant, KafkaCompletableFuture<T>
completableFuture) {
+ this.isDependant = isDependant;
+ this.completableFuture = completableFuture;
}
- /**
- * True if this future is done.
- */
- private boolean done = false;
-
- /**
- * The value of this future, or null. Protected by the object monitor.
- */
- private T value = null;
-
- /**
- * The exception associated with this future, or null. Protected by the
object monitor.
- */
- private Throwable exception = null;
-
- /**
- * A list of objects waiting for this future to complete (either
successfully or
- * exceptionally). Protected by the object monitor.
- */
- private List<BiConsumer<? super T, ? super Throwable>> waiters = new
ArrayList<>();
+ @Override
+ public CompletionStage<T> toCompletionStage() {
+ return completableFuture;
+ }
/**
* Returns a new KafkaFuture that, when this future completes normally, is
executed with this
@@ -141,103 +55,80 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
*/
@Override
public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) {
- KafkaFutureImpl<R> future = new KafkaFutureImpl<>();
- addWaiter(new Applicant<>(function, future));
- return future;
+ CompletableFuture<R> appliedFuture = completableFuture.thenApply(value
-> {
+ try {
+ return function.apply(value);
+ } catch (Throwable t) {
+ if (t instanceof CompletionException) {
+ // KafkaFuture#thenApply, when the function threw
CompletionException should return
+ // an ExecutionException wrapping a CompletionException
wrapping the exception thrown by the
+ // function. CompletableFuture#thenApply will just return
ExecutionException wrapping the
+ // exception thrown by the function, so we add an extra
CompletionException here to
+ // maintain the KafkaFuture behaviour.
+ throw new CompletionException(t);
+ } else {
+ throw t;
+ }
+ }
+ });
+ return new KafkaFutureImpl<>(true,
toKafkaCompletableFuture(appliedFuture));
}
- public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T>
function) {
- KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future;
- futureImpl.addWaiter(new Applicant<>(function, this));
+ private static <U> KafkaCompletableFuture<U>
toKafkaCompletableFuture(CompletableFuture<U> completableFuture) {
+ if (completableFuture instanceof KafkaCompletableFuture) {
+ return (KafkaCompletableFuture<U>) completableFuture;
+ } else {
+ final KafkaCompletableFuture<U> result = new
KafkaCompletableFuture<>();
+ completableFuture.whenComplete((x, y) -> {
+ if (y != null) {
+ result.kafkaCompleteExceptionally(y);
+ } else {
+ result.kafkaComplete(x);
+ }
+ });
+ return result;
+ }
}
/**
* @see KafkaFutureImpl#thenApply(BaseFunction)
+ * @deprecated Since Kafka 3.0.
*/
+ @Deprecated
@Override
public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
return thenApply((BaseFunction<T, R>) function);
}
- private static class WhenCompleteBiConsumer<T> implements BiConsumer<T,
Throwable> {
- private final KafkaFutureImpl<T> future;
- private final BiConsumer<? super T, ? super Throwable> biConsumer;
-
- WhenCompleteBiConsumer(KafkaFutureImpl<T> future, BiConsumer<? super
T, ? super Throwable> biConsumer) {
- this.future = future;
- this.biConsumer = biConsumer;
- }
-
- @Override
- public void accept(T val, Throwable exception) {
+ @Override
+ public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super
Throwable> biConsumer) {
+ CompletableFuture<T> tCompletableFuture =
completableFuture.whenComplete((java.util.function.BiConsumer<? super T, ?
super Throwable>) (a, b) -> {
try {
- if (exception != null) {
- biConsumer.accept(null, exception);
+ biConsumer.accept(a, b);
+ } catch (Throwable t) {
+ if (t instanceof CompletionException) {
+ throw new CompletionException(t);
} else {
- biConsumer.accept(val, null);
- }
- } catch (Throwable e) {
- if (exception == null) {
- exception = e;
+ throw t;
}
}
- if (exception != null) {
- future.completeExceptionally(exception);
- } else {
- future.complete(val);
- }
- }
+ });
+ return new KafkaFutureImpl<>(true,
toKafkaCompletableFuture(tCompletableFuture));
}
- @Override
- public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super
Throwable> biConsumer) {
- final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
- addWaiter(new WhenCompleteBiConsumer<>(future, biConsumer));
- return future;
- }
-
- protected synchronized void addWaiter(BiConsumer<? super T, ? super
Throwable> action) {
- if (exception != null) {
- action.accept(null, exception);
- } else if (done) {
- action.accept(value, null);
- } else {
- waiters.add(action);
- }
- }
@Override
- public synchronized boolean complete(T newValue) {
- List<BiConsumer<? super T, ? super Throwable>> oldWaiters;
- synchronized (this) {
- if (done)
- return false;
- value = newValue;
- done = true;
- oldWaiters = waiters;
- waiters = null;
- }
- for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
- waiter.accept(newValue, null);
- }
- return true;
+ public boolean complete(T newValue) {
+ return completableFuture.kafkaComplete(newValue);
}
@Override
public boolean completeExceptionally(Throwable newException) {
- List<BiConsumer<? super T, ? super Throwable>> oldWaiters;
- synchronized (this) {
- if (done)
- return false;
- exception = newException;
- done = true;
- oldWaiters = waiters;
- waiters = null;
- }
- for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
- waiter.accept(null, newException);
- }
- return true;
+ // CompletableFuture#get() always wraps the _cause_ of a
CompletionException in ExecutionException
+ // (which KafkaFuture does not) so wrap CompletionException in an
extra one to avoid losing the
+ // first CompletionException in the exception chain.
+ return completableFuture.kafkaCompleteExceptionally(
+ newException instanceof CompletionException ? new
CompletionException(newException) : newException);
}
/**
@@ -246,8 +137,23 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
* CompletionException caused by this CancellationException.
*/
@Override
- public synchronized boolean cancel(boolean mayInterruptIfRunning) {
- return completeExceptionally(new CancellationException()) || exception
instanceof CancellationException;
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return completableFuture.cancel(mayInterruptIfRunning);
+ }
+
+ /**
+ * We need to deal with differences between KafkaFuture's historic API and
the API of CompletableFuture:
+ * CompletableFuture#get() does not wrap CancellationException in
ExecutionException (nor does KafkaFuture).
+ * CompletableFuture#get() always wraps the _cause_ of a
CompletionException in ExecutionException
+ * (which KafkaFuture does not).
+ *
+ * The semantics for KafkaFuture are that all exceptional completions of
the future (via #completeExceptionally()
+ * or exceptions from dependants) manifest as ExecutionException, as
observed via both get() and getNow().
+ */
+ private void maybeThrowCancellationException(Throwable cause) {
+ if (cause instanceof CancellationException) {
+ throw (CancellationException) cause;
+ }
}
/**
@@ -255,9 +161,12 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
*/
@Override
public T get() throws InterruptedException, ExecutionException {
- SingleWaiter<T> waiter = new SingleWaiter<>();
- addWaiter(waiter);
- return waiter.await();
+ try {
+ return completableFuture.get();
+ } catch (ExecutionException e) {
+ maybeThrowCancellationException(e.getCause());
+ throw e;
+ }
}
/**
@@ -267,9 +176,12 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException,
TimeoutException {
- SingleWaiter<T> waiter = new SingleWaiter<>();
- addWaiter(waiter);
- return waiter.await(timeout, unit);
+ try {
+ return completableFuture.get(timeout, unit);
+ } catch (ExecutionException e) {
+ maybeThrowCancellationException(e.getCause());
+ throw e;
+ }
}
/**
@@ -277,40 +189,69 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
* the given valueIfAbsent.
*/
@Override
- public synchronized T getNow(T valueIfAbsent) throws InterruptedException,
ExecutionException {
- if (exception != null)
- wrapAndThrow(exception);
- if (done)
- return value;
- return valueIfAbsent;
+ public T getNow(T valueIfAbsent) throws ExecutionException {
+ try {
+ return completableFuture.getNow(valueIfAbsent);
+ } catch (CompletionException e) {
+ maybeThrowCancellationException(e.getCause());
+ // Note, unlike CompletableFuture#get() which throws
ExecutionException, CompletableFuture#getNow()
+ // throws CompletionException, thus needs rewrapping to conform to
KafkaFuture API,
+ // where KafkaFuture#getNow() throws ExecutionException.
+ throw new ExecutionException(e.getCause());
+ }
}
/**
* Returns true if this CompletableFuture was cancelled before it
completed normally.
*/
@Override
- public synchronized boolean isCancelled() {
- return exception instanceof CancellationException;
+ public boolean isCancelled() {
+ if (isDependant) {
+ // Having isCancelled() for a dependent future just return
+ // CompletableFuture.isCancelled() would break the historical
KafkaFuture behaviour because
+ // CompletableFuture#isCancelled() just checks for the exception
being CancellationException
+ // whereas it will be a CompletionException wrapping a
CancellationException
+ // due needing to compensate for CompletableFuture's
CompletionException unwrapping
+ // shenanigans in other methods.
+ try {
+ completableFuture.getNow(null);
+ return false;
+ } catch (Exception e) {
+ return e instanceof CompletionException
+ && e.getCause() instanceof CancellationException;
+ }
+ } else {
+ return completableFuture.isCancelled();
+ }
}
/**
* Returns true if this CompletableFuture completed exceptionally, in any
way.
*/
@Override
- public synchronized boolean isCompletedExceptionally() {
- return exception != null;
+ public boolean isCompletedExceptionally() {
+ return completableFuture.isCompletedExceptionally();
}
/**
* Returns true if completed in any fashion: normally, exceptionally, or
via cancellation.
*/
@Override
- public synchronized boolean isDone() {
- return done;
+ public boolean isDone() {
+ return completableFuture.isDone();
}
@Override
public String toString() {
- return String.format("KafkaFuture{value=%s,exception=%s,done=%b}",
value, exception, done);
+ T value = null;
+ Throwable exception = null;
+ try {
+ value = completableFuture.getNow(null);
+ } catch (CompletionException e) {
+ exception = e.getCause();
+ } catch (Exception e) {
+ exception = e;
+ }
+ return String.format("KafkaFuture{value=%s,exception=%s,done=%b}",
value, exception, exception != null || value != null);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index 6fc91ea..0218ce1 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -17,15 +17,24 @@
package org.apache.kafka.common;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Java;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -38,47 +47,197 @@ import static
org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(120)
public class KafkaFutureTest {
+ /** Asserts that the given future is done, didn't fail and wasn't
cancelled. */
+ private void assertIsSuccessful(KafkaFuture<?> future) {
+ assertTrue(future.isDone());
+ assertFalse(future.isCompletedExceptionally());
+ assertFalse(future.isCancelled());
+ }
+
+ /** Asserts that the given future is done, failed and wasn't cancelled. */
+ private void assertIsFailed(KafkaFuture<?> future) {
+ assertTrue(future.isDone());
+ assertFalse(future.isCancelled());
+ assertTrue(future.isCompletedExceptionally());
+ }
+
+ /** Asserts that the given future is done, didn't fail and was cancelled.
*/
+ private void assertIsCancelled(KafkaFuture<?> future) {
+ assertTrue(future.isDone());
+ assertTrue(future.isCancelled());
+ assertTrue(future.isCompletedExceptionally());
+ assertThrows(CancellationException.class, () -> future.getNow(null));
+ assertThrows(CancellationException.class, () -> future.get(0,
TimeUnit.MILLISECONDS));
+ }
+
+ private <T> void awaitAndAssertResult(KafkaFuture<T> future,
+ T expectedResult,
+ T alternativeValue) {
+ assertNotEquals(expectedResult, alternativeValue);
+ try {
+ assertEquals(expectedResult, future.get(5, TimeUnit.MINUTES));
+ } catch (Exception e) {
+ throw new AssertionError("Unexpected exception", e);
+ }
+ try {
+ assertEquals(expectedResult, future.get());
+ } catch (Exception e) {
+ throw new AssertionError("Unexpected exception", e);
+ }
+ try {
+ assertEquals(expectedResult, future.getNow(alternativeValue));
+ } catch (Exception e) {
+ throw new AssertionError("Unexpected exception", e);
+ }
+ }
+
+ private Throwable awaitAndAssertFailure(KafkaFuture<?> future,
+ Class<? extends Throwable>
expectedException,
+ String expectedMessage) {
+ ExecutionException executionException =
assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.MINUTES));
+ assertEquals(expectedException,
executionException.getCause().getClass());
+ assertEquals(expectedMessage,
executionException.getCause().getMessage());
+
+ executionException = assertThrows(ExecutionException.class, () ->
future.get());
+ assertEquals(expectedException,
executionException.getCause().getClass());
+ assertEquals(expectedMessage,
executionException.getCause().getMessage());
+
+ executionException = assertThrows(ExecutionException.class, () ->
future.getNow(null));
+ assertEquals(expectedException,
executionException.getCause().getClass());
+ assertEquals(expectedMessage,
executionException.getCause().getMessage());
+ return executionException.getCause();
+ }
+
+ private void awaitAndAssertCancelled(KafkaFuture<?> future, String
expectedMessage) {
+ CancellationException cancellationException =
assertThrows(CancellationException.class, () -> future.get(5,
TimeUnit.MINUTES));
+ assertEquals(expectedMessage, cancellationException.getMessage());
+ assertEquals(CancellationException.class,
cancellationException.getClass());
+
+ cancellationException = assertThrows(CancellationException.class, ()
-> future.get());
+ assertEquals(expectedMessage, cancellationException.getMessage());
+ assertEquals(CancellationException.class,
cancellationException.getClass());
+
+ cancellationException = assertThrows(CancellationException.class, ()
-> future.getNow(null));
+ assertEquals(expectedMessage, cancellationException.getMessage());
+ assertEquals(CancellationException.class,
cancellationException.getClass());
+ }
+
@Test
public void testCompleteFutures() throws Exception {
KafkaFutureImpl<Integer> future123 = new KafkaFutureImpl<>();
assertTrue(future123.complete(123));
- assertEquals(Integer.valueOf(123), future123.get());
assertFalse(future123.complete(456));
- assertTrue(future123.isDone());
- assertFalse(future123.isCancelled());
- assertFalse(future123.isCompletedExceptionally());
+ assertFalse(future123.cancel(true));
+ assertEquals(Integer.valueOf(123), future123.get());
+ assertIsSuccessful(future123);
KafkaFuture<Integer> future456 = KafkaFuture.completedFuture(456);
+ assertFalse(future456.complete(789));
+ assertFalse(future456.cancel(true));
assertEquals(Integer.valueOf(456), future456.get());
+ assertIsSuccessful(future456);
+ }
+ @Test
+ public void testCompleteFuturesExceptionally() throws Exception {
KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
- futureFail.completeExceptionally(new RuntimeException("We require more
vespene gas"));
- ExecutionException e = assertThrows(ExecutionException.class,
futureFail::get);
- assertEquals(RuntimeException.class, e.getCause().getClass());
- assertEquals("We require more vespene gas", e.getCause().getMessage());
+ assertTrue(futureFail.completeExceptionally(new RuntimeException("We
require more vespene gas")));
+ assertIsFailed(futureFail);
+ assertFalse(futureFail.completeExceptionally(new RuntimeException("We
require more minerals")));
+ assertFalse(futureFail.cancel(true));
+
+ ExecutionException executionException =
assertThrows(ExecutionException.class, () -> futureFail.get());
+ assertEquals(RuntimeException.class,
executionException.getCause().getClass());
+ assertEquals("We require more vespene gas",
executionException.getCause().getMessage());
+
+ KafkaFutureImpl<Integer> tricky1 = new KafkaFutureImpl<>();
+ assertTrue(tricky1.completeExceptionally(new CompletionException(new
CancellationException())));
+ assertIsFailed(tricky1);
+ awaitAndAssertFailure(tricky1, CompletionException.class,
"java.util.concurrent.CancellationException");
+ }
+
+ @Test
+ public void testCompleteFuturesViaCancellation() {
+ KafkaFutureImpl<Integer> viaCancel = new KafkaFutureImpl<>();
+ assertTrue(viaCancel.cancel(true));
+ assertIsCancelled(viaCancel);
+ awaitAndAssertCancelled(viaCancel, null);
+
+ KafkaFutureImpl<Integer> viaCancellationException = new
KafkaFutureImpl<>();
+ assertTrue(viaCancellationException.completeExceptionally(new
CancellationException("We require more vespene gas")));
+ assertIsCancelled(viaCancellationException);
+ awaitAndAssertCancelled(viaCancellationException, "We require more
vespene gas");
+ }
+
+ @Test
+ public void testToString() {
+ KafkaFutureImpl<Integer> success = new KafkaFutureImpl<>();
+ assertEquals("KafkaFuture{value=null,exception=null,done=false}",
success.toString());
+ success.complete(12);
+ assertEquals("KafkaFuture{value=12,exception=null,done=true}",
success.toString());
+
+ KafkaFutureImpl<Integer> failure = new KafkaFutureImpl<>();
+ failure.completeExceptionally(new RuntimeException("foo"));
+
assertEquals("KafkaFuture{value=null,exception=java.lang.RuntimeException:
foo,done=true}", failure.toString());
+
+ KafkaFutureImpl<Integer> tricky1 = new KafkaFutureImpl<>();
+ tricky1.completeExceptionally(new CompletionException(new
CancellationException()));
+
assertEquals("KafkaFuture{value=null,exception=java.util.concurrent.CompletionException:
java.util.concurrent.CancellationException,done=true}", tricky1.toString());
+
+ KafkaFutureImpl<Integer> cancelled = new KafkaFutureImpl<>();
+ cancelled.cancel(true);
+
assertEquals("KafkaFuture{value=null,exception=java.util.concurrent.CancellationException,done=true}",
cancelled.toString());
}
@Test
public void testCompletingFutures() throws Exception {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
CompleterThread<String> myThread = new CompleterThread<>(future, "You
must construct additional pylons.");
- assertFalse(future.isDone());
- assertFalse(future.isCompletedExceptionally());
- assertFalse(future.isCancelled());
+ assertIsNotCompleted(future);
assertEquals("I am ready", future.getNow("I am ready"));
myThread.start();
- String str = future.get(5, TimeUnit.MINUTES);
- assertEquals("You must construct additional pylons.", str);
- assertEquals("You must construct additional pylons.", future.getNow("I
am ready"));
- assertTrue(future.isDone());
- assertFalse(future.isCompletedExceptionally());
- assertFalse(future.isCancelled());
+ awaitAndAssertResult(future, "You must construct additional pylons.",
"I am ready");
+ assertIsSuccessful(future);
+ myThread.join();
+ assertNull(myThread.testException);
+ }
+
+ @Test
+ public void testCompletingFuturesExceptionally() throws Exception {
+ final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+ CompleterThread<String> myThread = new CompleterThread<>(future, null,
+ new RuntimeException("Ultimate efficiency achieved."));
+ assertIsNotCompleted(future);
+ assertEquals("I am ready", future.getNow("I am ready"));
+ myThread.start();
+ awaitAndAssertFailure(future, RuntimeException.class, "Ultimate
efficiency achieved.");
+ assertIsFailed(future);
myThread.join();
assertNull(myThread.testException);
}
@Test
- public void testThenApply() throws Exception {
+ public void testCompletingFuturesViaCancellation() throws Exception {
+ final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+ CompleterThread<String> myThread = new CompleterThread<>(future, null,
+ new CancellationException("Ultimate efficiency achieved."));
+ assertIsNotCompleted(future);
+ assertEquals("I am ready", future.getNow("I am ready"));
+ myThread.start();
+ awaitAndAssertCancelled(future, "Ultimate efficiency achieved.");
+ assertIsCancelled(future);
+ myThread.join();
+ assertNull(myThread.testException);
+ }
+
+ private void assertIsNotCompleted(KafkaFutureImpl<String> future) {
+ assertFalse(future.isDone());
+ assertFalse(future.isCompletedExceptionally());
+ assertFalse(future.isCancelled());
+ }
+
+ @Test
+ public void testThenApplyOnSucceededFuture() throws Exception {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
KafkaFuture<Integer> doubledFuture = future.thenApply(integer -> 2 *
integer);
assertFalse(doubledFuture.isDone());
@@ -90,23 +249,207 @@ public class KafkaFutureTest {
assertEquals(Integer.valueOf(63), tripledFuture.getNow(-1));
KafkaFuture<Integer> quadrupledFuture = future.thenApply(integer -> 4
* integer);
assertEquals(Integer.valueOf(84), quadrupledFuture.getNow(-1));
+ }
- KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
- KafkaFuture<Integer> futureAppliedFail = futureFail.thenApply(integer
-> 2 * integer);
- futureFail.completeExceptionally(new RuntimeException());
- assertTrue(futureFail.isCompletedExceptionally());
- assertTrue(futureAppliedFail.isCompletedExceptionally());
+ @Test
+ public void testThenApplyOnFailedFuture() {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 *
integer);
+ future.completeExceptionally(new RuntimeException("We require more
vespene gas"));
+ assertIsFailed(future);
+ assertIsFailed(dependantFuture);
+ awaitAndAssertFailure(future, RuntimeException.class, "We require more
vespene gas");
+ awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We
require more vespene gas");
+ }
+
+ @Test
+ public void testThenApplyOnFailedFutureTricky() {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 *
integer);
+ future.completeExceptionally(new CompletionException(new
RuntimeException("We require more vespene gas")));
+ assertIsFailed(future);
+ assertIsFailed(dependantFuture);
+ awaitAndAssertFailure(future, CompletionException.class,
"java.lang.RuntimeException: We require more vespene gas");
+ awaitAndAssertFailure(dependantFuture, CompletionException.class,
"java.lang.RuntimeException: We require more vespene gas");
+ }
+
+ @Test
+ public void testThenApplyOnFailedFutureTricky2() {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 *
integer);
+ future.completeExceptionally(new CompletionException(new
CancellationException()));
+ assertIsFailed(future);
+ assertIsFailed(dependantFuture);
+ awaitAndAssertFailure(future, CompletionException.class,
"java.util.concurrent.CancellationException");
+ awaitAndAssertFailure(dependantFuture, CompletionException.class,
"java.util.concurrent.CancellationException");
+ }
+
+ @Test
+ public void testThenApplyOnSucceededFutureAndFunctionThrows() {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> {
+ throw new RuntimeException("We require more vespene gas");
+ });
+ future.complete(21);
+ assertIsSuccessful(future);
+ assertIsFailed(dependantFuture);
+ awaitAndAssertResult(future, 21, null);
+ awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We
require more vespene gas");
+ }
+
+ @Test
+ public void
testThenApplyOnSucceededFutureAndFunctionThrowsCompletionException() {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> {
+ throw new CompletionException(new RuntimeException("We require
more vespene gas"));
+ });
+ future.complete(21);
+ assertIsSuccessful(future);
+ assertIsFailed(dependantFuture);
+ awaitAndAssertResult(future, 21, null);
+ Throwable cause = awaitAndAssertFailure(dependantFuture,
CompletionException.class, "java.lang.RuntimeException: We require more vespene
gas");
+ assertTrue(cause.getCause() instanceof RuntimeException);
+ assertEquals(cause.getCause().getMessage(), "We require more vespene
gas");
+ }
+
+ @Test
+ public void testThenApplyOnFailedFutureFunctionNotCalled() {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ boolean[] ran = {false};
+ KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> {
+ // Because the top level future failed, this should never be
called.
+ ran[0] = true;
+ return null;
+ });
+ future.completeExceptionally(new RuntimeException("We require more
minerals"));
+ assertIsFailed(future);
+ assertIsFailed(dependantFuture);
+ awaitAndAssertFailure(future, RuntimeException.class, "We require more
minerals");
+ awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We
require more minerals");
+ assertFalse(ran[0]);
+ }
+
+ @Test
+ public void testThenApplyOnCancelledFuture() {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 *
integer);
+ future.cancel(true);
+ assertIsCancelled(future);
+ assertIsCancelled(dependantFuture);
+ awaitAndAssertCancelled(future, null);
+ awaitAndAssertCancelled(dependantFuture, null);
+ }
+
+ @Test
+ public void testWhenCompleteOnSucceededFuture() throws Throwable {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ Throwable[] err = new Throwable[1];
+ boolean[] ran = {false};
+ KafkaFuture<Integer> dependantFuture = future.whenComplete((integer,
ex) -> {
+ ran[0] = true;
+ try {
+ assertEquals(Integer.valueOf(21), integer);
+ if (ex != null) {
+ throw ex;
+ }
+ } catch (Throwable e) {
+ err[0] = e;
+ }
+ });
+ assertFalse(dependantFuture.isDone());
+ assertTrue(future.complete(21));
+ assertTrue(ran[0]);
+ if (err[0] != null) {
+ throw err[0];
+ }
+ }
+
+ @Test
+ public void testWhenCompleteOnFailedFuture() {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ Throwable[] err = new Throwable[1];
+ boolean[] ran = {false};
+ KafkaFuture<Integer> dependantFuture = future.whenComplete((integer,
ex) -> {
+ ran[0] = true;
+ err[0] = ex;
+ if (integer != null) {
+ err[0] = new AssertionError();
+ }
+ });
+ assertFalse(dependantFuture.isDone());
+ RuntimeException ex = new RuntimeException("We require more vespene
gas");
+ assertTrue(future.completeExceptionally(ex));
+ assertTrue(ran[0]);
+ assertEquals(err[0], ex);
+ }
+
+ @Test
+ public void testWhenCompleteOnSucceededFutureAndConsumerThrows() {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ boolean[] ran = {false};
+ KafkaFuture<Integer> dependantFuture = future.whenComplete((integer,
ex) -> {
+ ran[0] = true;
+ throw new RuntimeException("We require more minerals");
+ });
+ assertFalse(dependantFuture.isDone());
+ assertTrue(future.complete(21));
+ assertIsSuccessful(future);
+ assertTrue(ran[0]);
+ assertIsFailed(dependantFuture);
+ awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We
require more minerals");
+ }
+
+ @Test
+ public void testWhenCompleteOnFailedFutureAndConsumerThrows() {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ boolean[] ran = {false};
+ KafkaFuture<Integer> dependantFuture = future.whenComplete((integer,
ex) -> {
+ ran[0] = true;
+ throw new RuntimeException("We require more minerals");
+ });
+ assertFalse(dependantFuture.isDone());
+ assertTrue(future.completeExceptionally(new RuntimeException("We
require more vespene gas")));
+ assertIsFailed(future);
+ assertTrue(ran[0]);
+ assertIsFailed(dependantFuture);
+ awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We
require more vespene gas");
+ }
+
+ @Test
+ public void testWhenCompleteOnCancelledFuture() {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ Throwable[] err = new Throwable[1];
+ boolean[] ran = {false};
+ KafkaFuture<Integer> dependantFuture = future.whenComplete((integer,
ex) -> {
+ ran[0] = true;
+ err[0] = ex;
+ if (integer != null) {
+ err[0] = new AssertionError();
+ }
+ });
+ assertFalse(dependantFuture.isDone());
+ assertTrue(future.cancel(true));
+ assertTrue(ran[0]);
+ assertTrue(err[0] instanceof CancellationException);
}
private static class CompleterThread<T> extends Thread {
private final KafkaFutureImpl<T> future;
private final T value;
+ private final Throwable exception;
Throwable testException = null;
CompleterThread(KafkaFutureImpl<T> future, T value) {
this.future = future;
this.value = value;
+ this.exception = null;
+ }
+
+ CompleterThread(KafkaFutureImpl<T> future, T value, Exception
exception) {
+ this.future = future;
+ this.value = value;
+ this.exception = exception;
}
@Override
@@ -116,7 +459,11 @@ public class KafkaFutureTest {
Thread.sleep(0, 200);
} catch (InterruptedException e) {
}
- future.complete(value);
+ if (exception == null) {
+ future.complete(value);
+ } else {
+ future.completeExceptionally(exception);
+ }
} catch (Throwable testException) {
this.testException = testException;
}
@@ -153,8 +500,8 @@ public class KafkaFutureTest {
futures.add(new KafkaFutureImpl<>());
}
KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new
KafkaFuture[0]));
- final List<CompleterThread> completerThreads = new ArrayList<>();
- final List<WaiterThread> waiterThreads = new ArrayList<>();
+ final List<CompleterThread<Integer>> completerThreads = new
ArrayList<>();
+ final List<WaiterThread<Integer>> waiterThreads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
completerThreads.add(new CompleterThread<>(futures.get(i), i));
waiterThreads.add(new WaiterThread<>(futures.get(i), i));
@@ -169,7 +516,7 @@ public class KafkaFutureTest {
assertFalse(allFuture.isDone());
completerThreads.get(numThreads - 1).start();
allFuture.get();
- assertTrue(allFuture.isDone());
+ assertIsSuccessful(allFuture);
for (int i = 0; i < numThreads; i++) {
assertEquals(Integer.valueOf(i), futures.get(i).get());
}
@@ -182,6 +529,52 @@ public class KafkaFutureTest {
}
@Test
+ public void testAllOfFuturesWithFailure() throws Exception {
+ final int numThreads = 5;
+ final List<KafkaFutureImpl<Integer>> futures = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(new KafkaFutureImpl<>());
+ }
+ KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new
KafkaFuture[0]));
+ final List<CompleterThread<Integer>> completerThreads = new
ArrayList<>();
+ final List<WaiterThread<Integer>> waiterThreads = new ArrayList<>();
+ int lastIndex = numThreads - 1;
+ for (int i = 0; i < lastIndex; i++) {
+ completerThreads.add(new CompleterThread<>(futures.get(i), i));
+ waiterThreads.add(new WaiterThread<>(futures.get(i), i));
+ }
+ completerThreads.add(new CompleterThread<>(futures.get(lastIndex),
null, new RuntimeException("Last one failed")));
+ waiterThreads.add(new WaiterThread<>(futures.get(lastIndex),
lastIndex));
+ assertFalse(allFuture.isDone());
+ for (int i = 0; i < numThreads; i++) {
+ waiterThreads.get(i).start();
+ }
+ for (int i = 0; i < lastIndex; i++) {
+ completerThreads.get(i).start();
+ }
+ assertFalse(allFuture.isDone());
+ completerThreads.get(lastIndex).start();
+ awaitAndAssertFailure(allFuture, RuntimeException.class, "Last one
failed");
+ assertIsFailed(allFuture);
+ for (int i = 0; i < lastIndex; i++) {
+ assertEquals(Integer.valueOf(i), futures.get(i).get());
+ }
+ assertIsFailed(futures.get(lastIndex));
+ for (int i = 0; i < numThreads; i++) {
+ completerThreads.get(i).join();
+ waiterThreads.get(i).join();
+ assertNull(completerThreads.get(i).testException);
+ if (i == lastIndex) {
+ assertEquals(ExecutionException.class,
waiterThreads.get(i).testException.getClass());
+ assertEquals(RuntimeException.class,
waiterThreads.get(i).testException.getCause().getClass());
+ assertEquals("Last one failed",
waiterThreads.get(i).testException.getCause().getMessage());
+ } else {
+ assertNull(waiterThreads.get(i).testException);
+ }
+ }
+ }
+
+ @Test
public void testAllOfFuturesHandlesZeroFutures() throws Exception {
KafkaFuture<Void> allFuture = KafkaFuture.allOf();
assertTrue(allFuture.isDone());
@@ -196,4 +589,60 @@ public class KafkaFutureTest {
assertThrows(TimeoutException.class, () -> future.get(0,
TimeUnit.MILLISECONDS));
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testLeakCompletableFuture() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException {
+ final KafkaFutureImpl<String> kfut = new KafkaFutureImpl<>();
+ CompletableFuture<String> comfut =
kfut.toCompletionStage().toCompletableFuture();
+ assertThrows(UnsupportedOperationException.class, () ->
comfut.complete(""));
+ assertThrows(UnsupportedOperationException.class, () ->
comfut.completeExceptionally(new RuntimeException()));
+ // Annoyingly CompletableFuture added some more methods in Java 9, but
the tests need to run on Java 8
+ // so test reflectively
+ if (Java.IS_JAVA9_COMPATIBLE) {
+ Method completeOnTimeout =
CompletableFuture.class.getDeclaredMethod("completeOnTimeout", Object.class,
Long.TYPE, TimeUnit.class);
+ assertThrows(UnsupportedOperationException.class, () -> {
+ try {
+ completeOnTimeout.invoke(comfut, "", 1L,
TimeUnit.MILLISECONDS);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ });
+
+ Method completeAsync =
CompletableFuture.class.getDeclaredMethod("completeAsync", Supplier.class);
+ assertThrows(UnsupportedOperationException.class, () -> {
+ try {
+ completeAsync.invoke(comfut, (Supplier<String>) () -> "");
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ });
+
+ Method obtrudeValue =
CompletableFuture.class.getDeclaredMethod("obtrudeValue", Object.class);
+ assertThrows(UnsupportedOperationException.class, () -> {
+ try {
+ obtrudeValue.invoke(comfut, "");
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ });
+
+ Method obtrudeException =
CompletableFuture.class.getDeclaredMethod("obtrudeException", Throwable.class);
+ assertThrows(UnsupportedOperationException.class, () -> {
+ try {
+ obtrudeException.invoke(comfut, new RuntimeException());
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ });
+
+ // Check the CF from a minimal CompletionStage doesn't cause
completion of the original KafkaFuture
+ Method minimal =
CompletableFuture.class.getDeclaredMethod("minimalCompletionStage");
+ CompletionStage<String> cs = (CompletionStage<String>)
minimal.invoke(comfut);
+ cs.toCompletableFuture().complete("");
+
+ assertFalse(kfut.isDone());
+ assertFalse(comfut.isDone());
+ }
+ }
+
}
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index b86226c..878cd01 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -99,6 +99,12 @@ For a detailed description of spotbugs bug categories, see
https://spotbugs.read
<Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"/>
</Match>
+ <!-- false positive, see https://github.com/spotbugs/spotbugs/issues/1001
-->
+ <Match>
+ <Class name="org.apache.kafka.common.internals.KafkaFutureImpl"/>
+ <Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
+ </Match>
+
<Match>
<!-- Suppression for the equals() for extension methods. -->
<Class name="kafka.api.package$ElectLeadersRequestOps"/>