Repository: james-project Updated Branches: refs/heads/master ea823c645 -> 7c333a50d
JAMES-2096 Limit the number of simultaneusly fetched messages during migration Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7c333a50 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7c333a50 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7c333a50 Branch: refs/heads/master Commit: 7c333a50d85ff43ed64a5931063abf35e7dfeccb Parents: ea823c6 Author: benwa <btell...@linagora.com> Authored: Tue Aug 1 15:11:53 2017 +0700 Committer: benwa <btell...@linagora.com> Committed: Tue Aug 1 16:04:48 2017 +0700 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMessageDAO.java | 17 ++++++++++------- .../cassandra/mail/migration/V1ToV2Migration.java | 15 ++++++++++++++- 2 files changed, 24 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/7c333a50/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index ba4802e..1c73063 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -97,7 +97,7 @@ public class CassandraMessageDAO { private final PreparedStatement selectHeaders; private final PreparedStatement selectFields; private final PreparedStatement selectBody; - private final PreparedStatement selectAll; + private final PreparedStatement selectByBatch; private CassandraUtils cassandraUtils; private final CassandraConfiguration cassandraConfiguration; @@ -113,7 +113,7 @@ public class CassandraMessageDAO { this.selectFields = prepareSelect(session, FIELDS); this.selectBody = prepareSelect(session, BODY); this.cassandraConfiguration = cassandraConfiguration; - this.selectAll = prepareSelectAll(session); + this.selectByBatch = prepareSelectBatch(session, cassandraConfiguration); this.cassandraUtils = cassandraUtils; } @@ -122,8 +122,9 @@ public class CassandraMessageDAO { this(session, typesProvider, CassandraConfiguration.DEFAULT_CONFIGURATION, CassandraUtils.WITH_DEFAULT_CONFIGURATION); } - private PreparedStatement prepareSelectAll(Session session) { - return session.prepare(select().from(TABLE_NAME)); + private PreparedStatement prepareSelectBatch(Session session, CassandraConfiguration cassandraConfiguration) { + return session.prepare(select().from(TABLE_NAME) + .limit(cassandraConfiguration.getFetchNextPageInAdvanceRow())); } private PreparedStatement prepareSelect(Session session, String[] fields) { @@ -152,11 +153,13 @@ public class CassandraMessageDAO { .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID)))); } - public Stream<RawMessage> readAll() { + public List<RawMessage> readBatch() { return cassandraUtils.convertToStream( - cassandraAsyncExecutor.execute(selectAll.bind().setFetchSize(cassandraConfiguration.getV1ReadFetchSize())) + cassandraAsyncExecutor.execute(selectByBatch.bind() + .setFetchSize(cassandraConfiguration.getV1ReadFetchSize())) .join()) - .map(this::fromRow); + .map(this::fromRow) + .collect(Guavate.toImmutableList()); } public CompletableFuture<Void> save(MailboxMessage message) throws MailboxException { http://git-wip-us.apache.org/repos/asf/james-project/blob/7c333a50/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java index 3d50f84..db87065 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java @@ -116,7 +116,20 @@ public class V1ToV2Migration implements Migration { @Override public MigrationResult run() { - return messageDAOV1.readAll() + boolean allResultFetched = false; + MigrationResult result = MigrationResult.COMPLETED; + + while (!allResultFetched) { + List<CassandraMessageDAO.RawMessage> batch = messageDAOV1.readBatch(); + allResultFetched = batch.isEmpty(); + result = Migration.combine(result, migrateBatch(batch)); + } + return result; + } + + private MigrationResult migrateBatch(List<CassandraMessageDAO.RawMessage> batch) { + return batch + .stream() .map(this::migrate) .reduce(MigrationResult.COMPLETED, Migration::combine); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org