Repository: james-project Updated Branches: refs/heads/master 1998ff35a -> daaa36a02
MAILBOX-304 Makes collecting FluentFutureStream easier Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6ad3c7e4 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6ad3c7e4 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6ad3c7e4 Branch: refs/heads/master Commit: 6ad3c7e4e8141270f3962f706fd0af234737b4ff Parents: fd1c775 Author: benwa <btell...@linagora.com> Authored: Wed Sep 6 15:49:16 2017 +0700 Committer: benwa <btell...@linagora.com> Committed: Wed Sep 6 17:55:48 2017 +0700 ---------------------------------------------------------------------- .../mail/CassandraAttachmentMapper.java | 13 ++++++------ .../mail/CassandraMessageIdMapper.java | 3 +-- .../apache/james/util/FluentFutureStream.java | 9 ++++++++ .../james/util/FluentFutureStreamTest.java | 22 ++++++++++++++++++++ 4 files changed, 39 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/6ad3c7e4/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java index f1e79d5..a385751 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java @@ -28,6 +28,7 @@ import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable. import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.SIZE; import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.TABLE_NAME; import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.TYPE; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; @@ -35,6 +36,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; + import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; @@ -45,6 +47,8 @@ import org.apache.james.mailbox.model.AttachmentId; import org.apache.james.mailbox.store.mail.AttachmentMapper; import org.apache.james.util.FluentFutureStream; import org.apache.james.util.OptionalUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; @@ -52,8 +56,7 @@ import com.github.fge.lambdas.Throwing; import com.github.fge.lambdas.ThrownByLambdaException; import com.github.steveash.guavate.Guavate; import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableList; public class CassandraAttachmentMapper implements AttachmentMapper { @@ -99,7 +102,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper { return getAttachmentsAsFuture(attachmentIds).join(); } - public CompletableFuture<List<Attachment>> getAttachmentsAsFuture(Collection<AttachmentId> attachmentIds) { + public CompletableFuture<ImmutableList<Attachment>> getAttachmentsAsFuture(Collection<AttachmentId> attachmentIds) { Preconditions.checkArgument(attachmentIds != null); Stream<CompletableFuture<Optional<Attachment>>> attachments = attachmentIds @@ -110,9 +113,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper { return FluentFutureStream .of(attachments) .flatMap(OptionalUtils::toStream) - .completableFuture() - .thenApply(stream -> - stream.collect(Guavate.toImmutableList())); + .collect(Guavate.toImmutableList()); } private CompletableFuture<Optional<Attachment>> getAttachmentAsFuture(AttachmentId attachmentId) { http://git-wip-us.apache.org/repos/asf/james-project/blob/6ad3c7e4/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 494cf91..ff9fb69 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -97,8 +97,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { return FluentFutureStream.ofNestedStreams( messageIds.stream() .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()))) - .completableFuture() - .thenApply(stream -> stream.collect(Guavate.toImmutableList())) + .collect(Guavate.toImmutableList()) .thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited())) .thenApply(stream -> stream .filter(CassandraMessageDAO.MessageResult::isFound) http://git-wip-us.apache.org/repos/asf/james-project/blob/6ad3c7e4/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java index 530bc09..9dcae7a 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Collector; import java.util.stream.Stream; public class FluentFutureStream<T> { @@ -174,6 +175,14 @@ public class FluentFutureStream<T> { } /** + * Returns the future of the underlying collected stream. + */ + public <C> CompletableFuture<C> collect(Collector<T, ?, C> collector) { + return this.completableFuture + .thenApply(stream -> stream.collect(collector)); + } + + /** * Join and returns the underlying stream. */ public Stream<T> join() { http://git-wip-us.apache.org/repos/asf/james-project/blob/6ad3c7e4/server/container/util-java8/src/test/java/org/apache/james/util/FluentFutureStreamTest.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/FluentFutureStreamTest.java b/server/container/util-java8/src/test/java/org/apache/james/util/FluentFutureStreamTest.java index 6bc692c..d6fb05c 100644 --- a/server/container/util-java8/src/test/java/org/apache/james/util/FluentFutureStreamTest.java +++ b/server/container/util-java8/src/test/java/org/apache/james/util/FluentFutureStreamTest.java @@ -233,4 +233,26 @@ public class FluentFutureStreamTest { assertThat(sideEffects).containsOnly(1, 2, 3); } + @Test + public void collectShouldReturnTheCollectionOfData() { + assertThat( + FluentFutureStream.of( + Stream.of( + CompletableFuture.completedFuture(1), + CompletableFuture.completedFuture(2), + CompletableFuture.completedFuture(3))) + .collect(Guavate.toImmutableList()) + .join()) + .containsExactly(1, 2, 3); + } + + @Test + public void collectShouldReturnEmptyWhenSteamIsEmpty() { + assertThat( + FluentFutureStream.ofFutures() + .collect(Guavate.toImmutableList()) + .join()) + .isEmpty(); + } + } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org