JAMESâ2105 Correct on the fly migration when attachments
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ea823c64 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ea823c64 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ea823c64 Branch: refs/heads/master Commit: ea823c645e99f2d355d863fc7593eb396eaea863 Parents: d640a48 Author: benwa <btell...@linagora.com> Authored: Mon Jul 31 15:17:02 2017 +0700 Committer: benwa <btell...@linagora.com> Committed: Tue Aug 1 12:46:04 2017 +0700 ---------------------------------------------------------------------- .../mail/MessageAttachmentRepresentation.java | 11 ++++++ .../mail/migration/V1ToV2Migration.java | 10 ++++- .../mail/migration/V1ToV2MigrationThread.java | 9 +++-- .../mail/migration/V1ToV2MigrationTest.java | 41 ++++++++++++++++++++ 4 files changed, 65 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java index 838ac56..172c550 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageAttachmentRepresentation.java @@ -23,6 +23,8 @@ import java.util.Optional; import org.apache.james.mailbox.model.AttachmentId; import org.apache.james.mailbox.model.Cid; +import org.apache.james.mailbox.model.MessageAttachment; +import org.apache.james.util.OptionalConverter; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -35,6 +37,15 @@ public class MessageAttachmentRepresentation { return new Builder(); } + public static MessageAttachmentRepresentation fromAttachment(MessageAttachment attachment) { + return builder() + .attachmentId(attachment.getAttachmentId()) + .cid(OptionalConverter.fromGuava(attachment.getCid())) + .isInline(attachment.isInline()) + .name(attachment.getName().orNull()) + .build(); + } + public static class Builder { private AttachmentId attachmentId; http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java index 7cb85bf..3d50f84 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.cassandra.mail.migration; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -44,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; public class V1ToV2Migration implements Migration { @@ -55,7 +57,7 @@ public class V1ToV2Migration implements Migration { private final AttachmentLoader attachmentLoader; private final CassandraConfiguration cassandraConfiguration; private final ExecutorService migrationExecutor; - private final ArrayBlockingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated; + private final ArrayBlockingQueue<Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>>> messagesToBeMigrated; @Inject public V1ToV2Migration(CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, @@ -99,11 +101,15 @@ public class V1ToV2Migration implements Migration { private Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> submitMigration(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageV1) { if (cassandraConfiguration.isOnTheFlyV1ToV2Migration()) { + Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>> messageV1WithAttachmentCopied = + Pair.of(messageV1.getLeft(), messageV1.getRight().collect(Guavate.toImmutableList())); synchronized (messagesToBeMigrated) { - if (!messagesToBeMigrated.offer(messageV1)) { + + if (!messagesToBeMigrated.offer(messageV1WithAttachmentCopied)) { LOGGER.info("Migration queue is full message {} is ignored", messageV1.getLeft().getMessageId()); } } + return Pair.of(messageV1.getLeft(), messageV1WithAttachmentCopied.getRight().stream()); } return messageV1; } http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java index 1b96179..c65a521 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.cassandra.mail.migration; +import java.util.List; import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -41,12 +42,12 @@ public class V1ToV2MigrationThread implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(V1ToV2MigrationThread.class); - private final BlockingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated; + private final BlockingQueue<Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>>> messagesToBeMigrated; private final CassandraMessageDAO messageDAOV1; private final CassandraMessageDAOV2 messageDAOV2; private final AttachmentLoader attachmentLoader; - public V1ToV2MigrationThread(BlockingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated, + public V1ToV2MigrationThread(BlockingQueue<Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>>> messagesToBeMigrated, CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, AttachmentLoader attachmentLoader) { this.messagesToBeMigrated = messagesToBeMigrated; this.messageDAOV1 = messageDAOV1; @@ -58,8 +59,8 @@ public class V1ToV2MigrationThread implements Runnable { public void run() { while (true) { try { - Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message = messagesToBeMigrated.take(); - performV1ToV2Migration(message).join(); + Pair<MessageWithoutAttachment, List<MessageAttachmentRepresentation>> message = messagesToBeMigrated.take(); + performV1ToV2Migration(Pair.of(message.getLeft(), message.getRight().stream())).join(); } catch (Exception e) { LOGGER.error("Error occured in migration thread", e); } http://git-wip-us.apache.org/repos/asf/james-project/blob/ea823c64/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java index 5825a92..8cce8fc 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java @@ -24,11 +24,14 @@ import java.util.Date; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.mail.Flags; import javax.mail.util.SharedByteArrayInputStream; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.init.CassandraModuleComposite; @@ -40,6 +43,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraBlobsDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2; import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation; +import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment; import org.apache.james.mailbox.cassandra.mail.utils.Limit; import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule; import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule; @@ -61,6 +65,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import com.github.steveash.guavate.Guavate; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; import com.jayway.awaitility.Awaitility; @@ -236,6 +241,42 @@ public class V1ToV2MigrationTest { .build()); } + @Test + public void migratedDataShouldBeRetrievedNoAttachment() throws Exception { + SimpleMailboxMessage originalMessage = createMessage(messageId, CONTENT, BODY_START, + new PropertyBuilder(), ImmutableList.of()); + + messageDAOV1.save(originalMessage).join(); + + Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> migratedData = + testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join(); + + awaitMigration(); + + softly.assertThat(migratedData.getLeft().getMessageId()).isEqualTo(messageId); + softly.assertThat(migratedData.getRight().collect(Guavate.toImmutableList())) + .isEmpty(); + } + + @Test + public void migratedDataShouldBeRetrievedWhenAttachment() throws Exception { + SimpleMailboxMessage originalMessage = createMessage(messageId, CONTENT, BODY_START, + new PropertyBuilder(), ImmutableList.of(messageAttachment)); + + attachmentMapper.storeAttachment(attachment); + + messageDAOV1.save(originalMessage).join(); + + Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> migratedData = + testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join(); + + awaitMigration(); + + softly.assertThat(migratedData.getLeft().getMessageId()).isEqualTo(messageId); + softly.assertThat(migratedData.getRight().collect(Guavate.toImmutableList())) + .containsOnly(MessageAttachmentRepresentation.fromAttachment(messageAttachment)); + } + private void awaitMigration() { awaitability.atMost(1, TimeUnit.MINUTES) .until(() -> { --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org