JAMES-1968 Enhance CompletableFutureUtils
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/73c5554a Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/73c5554a Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/73c5554a Branch: refs/heads/master Commit: 73c5554ad69989c3e67fb015b191cd674382fb7a Parents: 39d5747 Author: benwa <btell...@linagora.com> Authored: Thu Mar 16 22:03:02 2017 +0700 Committer: benwa <btell...@linagora.com> Committed: Wed Mar 22 07:10:55 2017 +0700 ---------------------------------------------------------------------- .../james/util/CompletableFutureUtil.java | 25 +++- .../apache/james/util/FluentFutureStream.java | 16 +++ .../james/util/CompletableFutureUtilTest.java | 121 +++++++++++++++++++ 3 files changed, 159 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/73c5554a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java index ba4f30a..c18217c 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java @@ -19,13 +19,20 @@ package org.apache.james.util; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Stream; public class CompletableFutureUtil { + @SafeVarargs + public static <T> CompletableFuture<Stream<T>> allOfArray(CompletableFuture<T>... futures) { + return allOf(Stream.of(futures)); + } + public static <T> CompletableFuture<Stream<T>> allOf(Stream<CompletableFuture<T>> futureStream) { return futureStream .map((CompletableFuture<T> future) -> future.thenApply(Stream::of)) @@ -51,17 +58,29 @@ public class CompletableFutureUtil { return futurStream .thenCompose(stream -> CompletableFutureUtil.allOf( - stream.map(action::apply))); + stream.map(action))); } public static <T, U> CompletableFuture<Stream<U>> map(CompletableFuture<Stream<T>> futurStream, Function<T, U> action) { return futurStream .thenApply(stream -> - stream.map(value -> - action.apply(value))); + stream.map(action)); + } + + public static <T, U> CompletableFuture<Optional<T>> reduce(BinaryOperator<T> binaryOperator, CompletableFuture<Stream<T>> futureStream) { + return futureStream.thenApply(stream -> stream.reduce(binaryOperator)); } public static <T> CompletableFuture<T> keepValue(Supplier<CompletableFuture<Void>> supplier, T value) { return supplier.get().thenApply(any -> value); } + + public static <T> Function<Boolean, CompletableFuture<Boolean>> composeIfTrue(Supplier<CompletableFuture<T>> composeOperation) { + return b -> { + if (b) { + return composeOperation.get().thenApply(any -> b); + } + return CompletableFuture.completedFuture(b); + }; + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/73c5554a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java index 85e6f03..7864715 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java @@ -19,7 +19,10 @@ package org.apache.james.util; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.stream.Stream; @@ -31,6 +34,15 @@ public class FluentFutureStream<T> { return new FluentFutureStream<>(completableFuture); } + public static <T> FluentFutureStream<T> of(Stream<CompletableFuture<T>> completableFutureStream) { + return new FluentFutureStream<>(CompletableFutureUtil.allOf(completableFutureStream)); + } + + @SafeVarargs + public static <T> FluentFutureStream<T> ofFutures(CompletableFuture<T>... completableFutures) { + return new FluentFutureStream<>(CompletableFutureUtil.allOfArray(completableFutures)); + } + private FluentFutureStream(CompletableFuture<Stream<T>> completableFuture) { this.completableFuture = completableFuture; } @@ -45,6 +57,10 @@ public class FluentFutureStream<T> { CompletableFutureUtil.map(completableFuture(), function)); } + public CompletableFuture<Optional<T>> reduce(BinaryOperator<T> combiner) { + return CompletableFutureUtil.reduce(combiner, completableFuture); + } + public <U> FluentFutureStream<U> thenComposeOnAll(Function<T, CompletableFuture<U>> function) { return FluentFutureStream.of( CompletableFutureUtil.thenComposeOnAll(completableFuture(), function)); http://git-wip-us.apache.org/repos/asf/james-project/blob/73c5554a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java index 31eb3f4..8035b50 100644 --- a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java +++ b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java @@ -89,6 +89,59 @@ public class CompletableFutureUtilTest { } @Test + public void allOfArrayShouldPreserveOrder() { + long value1 = 18L; + long value2 = 19L; + long value3 = 20L; + long value4 = 21L; + long value5 = 22L; + long value6 = 23L; + long value7 = 24L; + long value8 = 25L; + long value9 = 26L; + long value10 = 27L; + assertThat( + CompletableFutureUtil.allOfArray( + CompletableFuture.completedFuture(value1), + CompletableFuture.completedFuture(value2), + CompletableFuture.completedFuture(value3), + CompletableFuture.completedFuture(value4), + CompletableFuture.completedFuture(value5), + CompletableFuture.completedFuture(value6), + CompletableFuture.completedFuture(value7), + CompletableFuture.completedFuture(value8), + CompletableFuture.completedFuture(value9), + CompletableFuture.completedFuture(value10)) + .join() + .collect(Guavate.toImmutableList())) + .containsExactly(value1, value2, value3, value4, value5, value6, value7, value8, value9, value10); + } + + @Test + public void allOfArrayShouldUnboxNoArgs() { + assertThat( + CompletableFutureUtil.allOfArray() + .join() + .collect(Guavate.toImmutableList())) + .isEmpty(); + } + + @Test + public void allOfArrayShouldUnboxArray() { + long value1 = 18L; + long value2 = 19L; + long value3 = 20L; + assertThat( + CompletableFutureUtil.allOfArray( + CompletableFuture.completedFuture(value1), + CompletableFuture.completedFuture(value2), + CompletableFuture.completedFuture(value3)) + .join() + .collect(Guavate.toImmutableList())) + .containsOnly(value1, value2, value3); + } + + @Test public void allOfShouldWorkOnVeryLargeStream() { CompletableFutureUtil.allOf( IntStream.range(0, 100000) @@ -172,4 +225,72 @@ public class CompletableFutureUtilTest { .join()) .isNull(); } + + @Test + public void composeIfTrueShouldReturnTrueWhenTrue() { + assertThat( + CompletableFutureUtil.composeIfTrue(() -> CompletableFuture.completedFuture(null)) + .apply(true) + .join()) + .isTrue(); + } + + @Test + public void composeIfTrueShouldReturnFalseWhenFalse() { + assertThat( + CompletableFutureUtil.composeIfTrue(() -> CompletableFuture.completedFuture(null)) + .apply(false) + .join()) + .isFalse(); + } + + @Test + public void composeIfTrueShouldComposeWhenTrue() { + AtomicInteger atomicInteger = new AtomicInteger(0); + CompletableFutureUtil.composeIfTrue(() -> { + atomicInteger.incrementAndGet(); + return CompletableFuture.completedFuture(null); + }) + .apply(true) + .join(); + + assertThat(atomicInteger.get()).isEqualTo(1); + } + + @Test + public void composeIfTrueShouldNotComposeWhenFalse() { + AtomicInteger atomicInteger = new AtomicInteger(0); + CompletableFutureUtil.composeIfTrue(() -> { + atomicInteger.incrementAndGet(); + return CompletableFuture.completedFuture(null); + }) + .apply(false) + .join(); + + assertThat(atomicInteger.get()).isEqualTo(0); + } + + @Test + public void reduceShouldReturnEmptyWhenNoValue() { + assertThat( + CompletableFutureUtil.reduce( + (i, j) -> i + j, + CompletableFutureUtil.<Long>allOfArray()) + .join()) + .isEmpty(); + } + + @Test + public void reduceShouldWork() { + assertThat( + CompletableFutureUtil.reduce( + (i, j) -> i + j, + CompletableFutureUtil.allOfArray( + CompletableFuture.completedFuture(1L), + CompletableFuture.completedFuture(2L), + CompletableFuture.completedFuture(3L) + )) + .join()) + .contains(6L); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org