Repository: james-project
Updated Branches:
  refs/heads/master ea823c645 -> 7c333a50d


JAMES-2096 Limit the number of simultaneusly fetched messages during migration


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7c333a50
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7c333a50
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7c333a50

Branch: refs/heads/master
Commit: 7c333a50d85ff43ed64a5931063abf35e7dfeccb
Parents: ea823c6
Author: benwa <btell...@linagora.com>
Authored: Tue Aug 1 15:11:53 2017 +0700
Committer: benwa <btell...@linagora.com>
Committed: Tue Aug 1 16:04:48 2017 +0700

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMessageDAO.java        | 17 ++++++++++-------
 .../cassandra/mail/migration/V1ToV2Migration.java  | 15 ++++++++++++++-
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/7c333a50/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index ba4802e..1c73063 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -97,7 +97,7 @@ public class CassandraMessageDAO {
     private final PreparedStatement selectHeaders;
     private final PreparedStatement selectFields;
     private final PreparedStatement selectBody;
-    private final PreparedStatement selectAll;
+    private final PreparedStatement selectByBatch;
     private CassandraUtils cassandraUtils;
     private final CassandraConfiguration cassandraConfiguration;
 
@@ -113,7 +113,7 @@ public class CassandraMessageDAO {
         this.selectFields = prepareSelect(session, FIELDS);
         this.selectBody = prepareSelect(session, BODY);
         this.cassandraConfiguration = cassandraConfiguration;
-        this.selectAll = prepareSelectAll(session);
+        this.selectByBatch = prepareSelectBatch(session, 
cassandraConfiguration);
         this.cassandraUtils = cassandraUtils;
     }
 
@@ -122,8 +122,9 @@ public class CassandraMessageDAO {
         this(session, typesProvider, 
CassandraConfiguration.DEFAULT_CONFIGURATION, 
CassandraUtils.WITH_DEFAULT_CONFIGURATION);
     }
 
-    private PreparedStatement prepareSelectAll(Session session) {
-        return session.prepare(select().from(TABLE_NAME));
+    private PreparedStatement prepareSelectBatch(Session session, 
CassandraConfiguration cassandraConfiguration) {
+        return session.prepare(select().from(TABLE_NAME)
+            .limit(cassandraConfiguration.getFetchNextPageInAdvanceRow()));
     }
 
     private PreparedStatement prepareSelect(Session session, String[] fields) {
@@ -152,11 +153,13 @@ public class CassandraMessageDAO {
                 .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
     }
 
-    public Stream<RawMessage> readAll() {
+    public List<RawMessage> readBatch() {
         return cassandraUtils.convertToStream(
-            
cassandraAsyncExecutor.execute(selectAll.bind().setFetchSize(cassandraConfiguration.getV1ReadFetchSize()))
+            cassandraAsyncExecutor.execute(selectByBatch.bind()
+                .setFetchSize(cassandraConfiguration.getV1ReadFetchSize()))
                 .join())
-            .map(this::fromRow);
+            .map(this::fromRow)
+            .collect(Guavate.toImmutableList());
     }
 
     public CompletableFuture<Void> save(MailboxMessage message) throws 
MailboxException {

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c333a50/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
index 3d50f84..db87065 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
@@ -116,7 +116,20 @@ public class V1ToV2Migration implements Migration {
 
     @Override
     public MigrationResult run() {
-        return messageDAOV1.readAll()
+        boolean allResultFetched = false;
+        MigrationResult result = MigrationResult.COMPLETED;
+
+        while (!allResultFetched) {
+            List<CassandraMessageDAO.RawMessage> batch = 
messageDAOV1.readBatch();
+            allResultFetched = batch.isEmpty();
+            result = Migration.combine(result, migrateBatch(batch));
+        }
+        return result;
+    }
+
+    private MigrationResult migrateBatch(List<CassandraMessageDAO.RawMessage> 
batch) {
+        return batch
+            .stream()
             .map(this::migrate)
             .reduce(MigrationResult.COMPLETED, Migration::combine);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to