JAMES−2105 Correct on the fly migration when attachments

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ea823c64
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ea823c64
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ea823c64

Branch: refs/heads/master
Commit: ea823c645e99f2d355d863fc7593eb396eaea863
Parents: d640a48
Author: benwa <btell...@linagora.com>
Authored: Mon Jul 31 15:17:02 2017 +0700
Committer: benwa <btell...@linagora.com>
Committed: Tue Aug 1 12:46:04 2017 +0700

----------------------------------------------------------------------
 .../mail/MessageAttachmentRepresentation.java   | 11 ++++++
 .../mail/migration/V1ToV2Migration.java         | 10 ++++-
 .../mail/migration/V1ToV2MigrationThread.java   |  9 +++--
 .../mail/migration/V1ToV2MigrationTest.java     | 41 ++++++++++++++++++++
 4 files changed, 65 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
index 838ac56..172c550 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java
@@ -23,6 +23,8 @@ import java.util.Optional;
 
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.Cid;
+import org.apache.james.mailbox.model.MessageAttachment;
+import org.apache.james.util.OptionalConverter;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
@@ -35,6 +37,15 @@ public class MessageAttachmentRepresentation {
         return new Builder();
     }
 
+    public static MessageAttachmentRepresentation 
fromAttachment(MessageAttachment attachment) {
+        return builder()
+            .attachmentId(attachment.getAttachmentId())
+            .cid(OptionalConverter.fromGuava(attachment.getCid()))
+            .isInline(attachment.isInline())
+            .name(attachment.getName().orNull())
+            .build();
+    }
+
     public static class Builder {
 
         private AttachmentId attachmentId;

http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
index 7cb85bf..3d50f84 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail.migration;
 
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -44,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 
 public class V1ToV2Migration implements Migration {
@@ -55,7 +57,7 @@ public class V1ToV2Migration implements Migration {
     private final AttachmentLoader attachmentLoader;
     private final CassandraConfiguration cassandraConfiguration;
     private final ExecutorService migrationExecutor;
-    private final ArrayBlockingQueue<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated;
+    private final ArrayBlockingQueue<Pair<MessageWithoutAttachment, 
List<MessageAttachmentRepresentation>>> messagesToBeMigrated;
 
     @Inject
     public V1ToV2Migration(CassandraMessageDAO messageDAOV1, 
CassandraMessageDAOV2 messageDAOV2,
@@ -99,11 +101,15 @@ public class V1ToV2Migration implements Migration {
 
     private Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> 
submitMigration(Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> messageV1) {
         if (cassandraConfiguration.isOnTheFlyV1ToV2Migration()) {
+            Pair<MessageWithoutAttachment, 
List<MessageAttachmentRepresentation>> messageV1WithAttachmentCopied =
+                Pair.of(messageV1.getLeft(), 
messageV1.getRight().collect(Guavate.toImmutableList()));
             synchronized (messagesToBeMigrated) {
-                if (!messagesToBeMigrated.offer(messageV1)) {
+
+                if 
(!messagesToBeMigrated.offer(messageV1WithAttachmentCopied)) {
                     LOGGER.info("Migration queue is full message {} is 
ignored", messageV1.getLeft().getMessageId());
                 }
             }
+            return Pair.of(messageV1.getLeft(), 
messageV1WithAttachmentCopied.getRight().stream());
         }
         return messageV1;
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
index 1b96179..c65a521 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail.migration;
 
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -41,12 +42,12 @@ public class V1ToV2MigrationThread implements Runnable {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(V1ToV2MigrationThread.class);
 
-    private final BlockingQueue<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated;
+    private final BlockingQueue<Pair<MessageWithoutAttachment, 
List<MessageAttachmentRepresentation>>> messagesToBeMigrated;
     private final CassandraMessageDAO messageDAOV1;
     private final CassandraMessageDAOV2 messageDAOV2;
     private final AttachmentLoader attachmentLoader;
 
-    public V1ToV2MigrationThread(BlockingQueue<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated,
+    public V1ToV2MigrationThread(BlockingQueue<Pair<MessageWithoutAttachment, 
List<MessageAttachmentRepresentation>>> messagesToBeMigrated,
                                  CassandraMessageDAO messageDAOV1, 
CassandraMessageDAOV2 messageDAOV2, AttachmentLoader attachmentLoader) {
         this.messagesToBeMigrated = messagesToBeMigrated;
         this.messageDAOV1 = messageDAOV1;
@@ -58,8 +59,8 @@ public class V1ToV2MigrationThread implements Runnable {
     public void run() {
         while (true) {
             try {
-                Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> message = messagesToBeMigrated.take();
-                performV1ToV2Migration(message).join();
+                Pair<MessageWithoutAttachment, 
List<MessageAttachmentRepresentation>> message = messagesToBeMigrated.take();
+                performV1ToV2Migration(Pair.of(message.getLeft(), 
message.getRight().stream())).join();
             } catch (Exception e) {
                 LOGGER.error("Error occured in migration thread", e);
             }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
index 5825a92..8cce8fc 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
@@ -24,11 +24,14 @@ import java.util.Date;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.util.SharedByteArrayInputStream;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
@@ -40,6 +43,7 @@ import 
org.apache.james.mailbox.cassandra.mail.CassandraBlobsDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
 import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation;
+import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment;
 import org.apache.james.mailbox.cassandra.mail.utils.Limit;
 import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule;
 import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
@@ -61,6 +65,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
 import com.jayway.awaitility.Awaitility;
@@ -236,6 +241,42 @@ public class V1ToV2MigrationTest {
             .build());
     }
 
+    @Test
+    public void migratedDataShouldBeRetrievedNoAttachment() throws Exception {
+        SimpleMailboxMessage originalMessage = createMessage(messageId, 
CONTENT, BODY_START,
+            new PropertyBuilder(), ImmutableList.of());
+
+        messageDAOV1.save(originalMessage).join();
+
+        Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> migratedData =
+            
testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join();
+
+        awaitMigration();
+
+        
softly.assertThat(migratedData.getLeft().getMessageId()).isEqualTo(messageId);
+        
softly.assertThat(migratedData.getRight().collect(Guavate.toImmutableList()))
+            .isEmpty();
+    }
+
+    @Test
+    public void migratedDataShouldBeRetrievedWhenAttachment() throws Exception 
{
+        SimpleMailboxMessage originalMessage = createMessage(messageId, 
CONTENT, BODY_START,
+            new PropertyBuilder(), ImmutableList.of(messageAttachment));
+
+        attachmentMapper.storeAttachment(attachment);
+
+        messageDAOV1.save(originalMessage).join();
+
+        Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> migratedData =
+            
testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join();
+
+        awaitMigration();
+
+        
softly.assertThat(migratedData.getLeft().getMessageId()).isEqualTo(messageId);
+        
softly.assertThat(migratedData.getRight().collect(Guavate.toImmutableList()))
+            
.containsOnly(MessageAttachmentRepresentation.fromAttachment(messageAttachment));
+    }
+
     private void awaitMigration() {
         awaitability.atMost(1, TimeUnit.MINUTES)
             .until(() -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to