Repository: james-project
Updated Branches:
  refs/heads/master 1998ff35a -> daaa36a02


MAILBOX-304 Makes collecting FluentFutureStream easier


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6ad3c7e4
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6ad3c7e4
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6ad3c7e4

Branch: refs/heads/master
Commit: 6ad3c7e4e8141270f3962f706fd0af234737b4ff
Parents: fd1c775
Author: benwa <btell...@linagora.com>
Authored: Wed Sep 6 15:49:16 2017 +0700
Committer: benwa <btell...@linagora.com>
Committed: Wed Sep 6 17:55:48 2017 +0700

----------------------------------------------------------------------
 .../mail/CassandraAttachmentMapper.java         | 13 ++++++------
 .../mail/CassandraMessageIdMapper.java          |  3 +--
 .../apache/james/util/FluentFutureStream.java   |  9 ++++++++
 .../james/util/FluentFutureStreamTest.java      | 22 ++++++++++++++++++++
 4 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/6ad3c7e4/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index f1e79d5..a385751 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -28,6 +28,7 @@ import static 
org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.
 import static 
org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.SIZE;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.TABLE_NAME;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.TYPE;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
@@ -35,6 +36,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
+
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -45,6 +47,8 @@ import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.store.mail.AttachmentMapper;
 import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.OptionalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
@@ -52,8 +56,7 @@ import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.ThrownByLambdaException;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableList;
 
 public class CassandraAttachmentMapper implements AttachmentMapper {
 
@@ -99,7 +102,7 @@ public class CassandraAttachmentMapper implements 
AttachmentMapper {
         return getAttachmentsAsFuture(attachmentIds).join();
     }
 
-    public CompletableFuture<List<Attachment>> 
getAttachmentsAsFuture(Collection<AttachmentId> attachmentIds) {
+    public CompletableFuture<ImmutableList<Attachment>> 
getAttachmentsAsFuture(Collection<AttachmentId> attachmentIds) {
         Preconditions.checkArgument(attachmentIds != null);
 
         Stream<CompletableFuture<Optional<Attachment>>> attachments = 
attachmentIds
@@ -110,9 +113,7 @@ public class CassandraAttachmentMapper implements 
AttachmentMapper {
         return FluentFutureStream
             .of(attachments)
             .flatMap(OptionalUtils::toStream)
-            .completableFuture()
-            .thenApply(stream ->
-                stream.collect(Guavate.toImmutableList()));
+            .collect(Guavate.toImmutableList());
     }
 
     private CompletableFuture<Optional<Attachment>> 
getAttachmentAsFuture(AttachmentId attachmentId) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/6ad3c7e4/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 494cf91..ff9fb69 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -97,8 +97,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         return FluentFutureStream.ofNestedStreams(
             messageIds.stream()
                 .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) 
messageId, Optional.empty())))
-            .completableFuture()
-            .thenApply(stream -> stream.collect(Guavate.toImmutableList()))
+            .collect(Guavate.toImmutableList())
             .thenCompose(composedMessageIds -> 
messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
             .thenApply(stream -> stream
                 .filter(CassandraMessageDAO.MessageResult::isFound)

http://git-wip-us.apache.org/repos/asf/james-project/blob/6ad3c7e4/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
 
b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
index 530bc09..9dcae7a 100644
--- 
a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
+++ 
b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.BinaryOperator;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Collector;
 import java.util.stream.Stream;
 
 public class FluentFutureStream<T> {
@@ -174,6 +175,14 @@ public class FluentFutureStream<T> {
     }
 
     /**
+     * Returns the future of the underlying collected stream.
+     */
+    public <C> CompletableFuture<C> collect(Collector<T, ?, C> collector) {
+        return this.completableFuture
+            .thenApply(stream -> stream.collect(collector));
+    }
+
+    /**
      * Join and returns the underlying stream.
      */
     public Stream<T> join() {

http://git-wip-us.apache.org/repos/asf/james-project/blob/6ad3c7e4/server/container/util-java8/src/test/java/org/apache/james/util/FluentFutureStreamTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/test/java/org/apache/james/util/FluentFutureStreamTest.java
 
b/server/container/util-java8/src/test/java/org/apache/james/util/FluentFutureStreamTest.java
index 6bc692c..d6fb05c 100644
--- 
a/server/container/util-java8/src/test/java/org/apache/james/util/FluentFutureStreamTest.java
+++ 
b/server/container/util-java8/src/test/java/org/apache/james/util/FluentFutureStreamTest.java
@@ -233,4 +233,26 @@ public class FluentFutureStreamTest {
         assertThat(sideEffects).containsOnly(1, 2, 3);
     }
 
+    @Test
+    public void collectShouldReturnTheCollectionOfData() {
+        assertThat(
+            FluentFutureStream.of(
+                Stream.of(
+                    CompletableFuture.completedFuture(1),
+                    CompletableFuture.completedFuture(2),
+                    CompletableFuture.completedFuture(3)))
+                .collect(Guavate.toImmutableList())
+                .join())
+            .containsExactly(1, 2, 3);
+    }
+
+    @Test
+    public void collectShouldReturnEmptyWhenSteamIsEmpty() {
+        assertThat(
+            FluentFutureStream.ofFutures()
+                .collect(Guavate.toImmutableList())
+                .join())
+            .isEmpty();
+    }
+
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to