Repository: james-project
Updated Branches:
  refs/heads/master 9ed12378a -> ea823c645


JAMES−2096 Add a fetch size to migration process V1 initial read


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d640a485
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d640a485
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d640a485

Branch: refs/heads/master
Commit: d640a485b63889cf3beb3804e63a16e7699d3b72
Parents: 9ed1237
Author: benwa <btell...@linagora.com>
Authored: Mon Jul 31 11:21:49 2017 +0700
Committer: benwa <btell...@linagora.com>
Committed: Tue Aug 1 12:45:20 2017 +0700

----------------------------------------------------------------------
 .../cassandra/CassandraConfiguration.java       | 30 +++++++++++++++++---
 .../cassandra/CassandraConfigurationTest.java   | 19 +++++++++++++
 .../cassandra/mail/CassandraMessageDAO.java     |  2 +-
 .../modules/mailbox/CassandraSessionModule.java |  3 ++
 .../mailbox/CassandraSessionModuleTest.java     |  1 +
 .../modules/mailbox/cassandra.properties        |  1 +
 src/site/xdoc/server/config-cassandra.xml       |  2 ++
 7 files changed, 53 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
index 5f0850b..5de74a4 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
@@ -42,6 +42,7 @@ public class CassandraConfiguration {
     public static final int DEFAULT_BLOB_PART_SIZE = 100 * 1024;
     public static final int DEFAULT_MIGRATION_V1_TO_V2_QUEUE_LENGTH = 1000;
     public static final int DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT = 2;
+    public static final int DEFAULT_MIGRATION_V1_READ_FETCH_SIZE = 10;
 
     public static class Builder {
         private Optional<Integer> messageReadChunkSize = Optional.empty();
@@ -57,6 +58,7 @@ public class CassandraConfiguration {
         private Optional<Boolean> onTheFlyV1ToV2Migration = Optional.empty();
         private Optional<Integer> v1ToV2QueueLength = Optional.empty();
         private Optional<Integer> v1ToV2ThreadCount = Optional.empty();
+        private Optional<Integer> v1ReadFetchSize = Optional.empty();
 
         public Builder messageReadChunkSize(int value) {
             Preconditions.checkArgument(value > 0, "messageReadChunkSize needs 
to be strictly positive");
@@ -130,6 +132,12 @@ public class CassandraConfiguration {
             return this;
         }
 
+        public Builder v1ReadFetchSize(int value) {
+            Preconditions.checkArgument(value > 0, "v1ReadFetchSize needs to 
be strictly positive");
+            this.v1ReadFetchSize = Optional.of(value);
+            return this;
+        }
+
         public Builder onTheFlyV1ToV2Migration(boolean value) {
             this.onTheFlyV1ToV2Migration = Optional.of(value);
             return this;
@@ -200,6 +208,11 @@ public class CassandraConfiguration {
             return this;
         }
 
+        public Builder v1ReadFetchSize(Optional<Integer> value) {
+            value.ifPresent(this::v1ReadFetchSize);
+            return this;
+        }
+
         public CassandraConfiguration build() {
             return new 
CassandraConfiguration(aclMaxRetry.orElse(DEFAULT_ACL_MAX_RETRY),
                 
messageReadChunkSize.orElse(DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ),
@@ -213,7 +226,8 @@ public class CassandraConfiguration {
                 blobPartSize.orElse(DEFAULT_BLOB_PART_SIZE),
                 
onTheFlyV1ToV2Migration.orElse(DEFAULT_ON_THE_FLY_MIGRATION_V1_TO_V2),
                 
v1ToV2QueueLength.orElse(DEFAULT_MIGRATION_V1_TO_V2_QUEUE_LENGTH),
-                
v1ToV2ThreadCount.orElse(DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT));
+                
v1ToV2ThreadCount.orElse(DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT),
+                v1ReadFetchSize.orElse(DEFAULT_MIGRATION_V1_READ_FETCH_SIZE));
         }
     }
 
@@ -234,13 +248,14 @@ public class CassandraConfiguration {
     private final boolean onTheFlyV1ToV2Migration;
     private final int v1ToV2QueueLength;
     private final int v1ToV2ThreadCount;
+    private final int v1ReadFetchSize;
 
     @VisibleForTesting
     CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int 
expungeChunkSize,
                            int flagsUpdateChunkSize, int 
flagsUpdateMessageIdMaxRetry, int flagsUpdateMessageMaxRetry,
                            int modSeqMaxRetry, int uidMaxRetry, int 
fetchNextPageInAdvanceRow,
                            int blobPartSize, boolean onTheFlyV1ToV2Migration, 
int v1ToV2QueueLength,
-                           int v1ToV2ThreadCount
+                           int v1ToV2ThreadCount, int v1ReadFetchSize
     ) {
         this.aclMaxRetry = aclMaxRetry;
         this.messageReadChunkSize = messageReadChunkSize;
@@ -255,6 +270,7 @@ public class CassandraConfiguration {
         this.onTheFlyV1ToV2Migration = onTheFlyV1ToV2Migration;
         this.v1ToV2QueueLength = v1ToV2QueueLength;
         this.v1ToV2ThreadCount = v1ToV2ThreadCount;
+        this.v1ReadFetchSize = v1ReadFetchSize;
     }
 
     public int getBlobPartSize() {
@@ -309,6 +325,10 @@ public class CassandraConfiguration {
         return v1ToV2ThreadCount;
     }
 
+    public int getV1ReadFetchSize() {
+        return v1ReadFetchSize;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof CassandraConfiguration) {
@@ -326,7 +346,8 @@ public class CassandraConfiguration {
                 && Objects.equals(this.blobPartSize, that.blobPartSize)
                 && Objects.equals(this.onTheFlyV1ToV2Migration, 
that.onTheFlyV1ToV2Migration)
                 && Objects.equals(this.v1ToV2ThreadCount, 
that.v1ToV2ThreadCount)
-                && Objects.equals(this.v1ToV2QueueLength, 
that.v1ToV2QueueLength);
+                && Objects.equals(this.v1ToV2QueueLength, 
that.v1ToV2QueueLength)
+                && Objects.equals(this.v1ReadFetchSize, that.v1ReadFetchSize);
         }
         return false;
     }
@@ -335,7 +356,7 @@ public class CassandraConfiguration {
     public final int hashCode() {
         return Objects.hash(aclMaxRetry, messageReadChunkSize, 
expungeChunkSize, flagsUpdateMessageIdMaxRetry,
             flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, 
fetchNextPageInAdvanceRow, flagsUpdateChunkSize,
-            blobPartSize, onTheFlyV1ToV2Migration, v1ToV2ThreadCount, 
v1ToV2QueueLength);
+            blobPartSize, onTheFlyV1ToV2Migration, v1ToV2ThreadCount, 
v1ToV2QueueLength, v1ReadFetchSize);
     }
 
     @Override
@@ -354,6 +375,7 @@ public class CassandraConfiguration {
             .add("onTheFlyV1ToV2Migration", onTheFlyV1ToV2Migration)
             .add("v1ToV2ThreadCount", v1ToV2ThreadCount)
             .add("v1ToV2QueueLength", v1ToV2QueueLength)
+            .add("v1ReadFetchSize", v1ReadFetchSize)
             .toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
index e88d4a8..b38dbf8 100644
--- 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
+++ 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
@@ -224,6 +224,22 @@ public class CassandraConfigurationTest {
     }
 
     @Test
+    public void v1ReadFetchSizeShouldThrowOnNegative() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .v1ReadFetchSize(-1);
+    }
+
+    @Test
+    public void v1ReadFetchSizeShouldThrowOnZero() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        CassandraConfiguration.builder()
+            .v1ToV2QueueLength(0);
+    }
+
+    @Test
     public void builderShouldCreateTheRightObject() {
         int aclMaxRetry = 1;
         int modSeqMaxRetry = 2;
@@ -238,6 +254,7 @@ public class CassandraConfigurationTest {
         boolean onTheFlyV1ToV2Migration = true;
         int v1ToV2ThreadCount = 11;
         int v1ToV2QueueLength = 12;
+        int v1ReadFetchSize = 13;
 
         CassandraConfiguration configuration = CassandraConfiguration.builder()
             .aclMaxRetry(aclMaxRetry)
@@ -253,6 +270,7 @@ public class CassandraConfigurationTest {
             .onTheFlyV1ToV2Migration(onTheFlyV1ToV2Migration)
             .v1ToV2ThreadCount(v1ToV2ThreadCount)
             .v1ToV2QueueLength(v1ToV2QueueLength)
+            .v1ReadFetchSize(v1ReadFetchSize)
             .build();
 
         
softly.assertThat(configuration.getAclMaxRetry()).isEqualTo(aclMaxRetry);
@@ -268,6 +286,7 @@ public class CassandraConfigurationTest {
         
softly.assertThat(configuration.isOnTheFlyV1ToV2Migration()).isEqualTo(onTheFlyV1ToV2Migration);
         
softly.assertThat(configuration.getV1ToV2ThreadCount()).isEqualTo(v1ToV2ThreadCount);
         
softly.assertThat(configuration.getV1ToV2QueueLength()).isEqualTo(v1ToV2QueueLength);
+        
softly.assertThat(configuration.getV1ReadFetchSize()).isEqualTo(v1ReadFetchSize);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 78f6372..ba4802e 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -154,7 +154,7 @@ public class CassandraMessageDAO {
 
     public Stream<RawMessage> readAll() {
         return cassandraUtils.convertToStream(
-            cassandraAsyncExecutor.execute(selectAll.bind())
+            
cassandraAsyncExecutor.execute(selectAll.bind().setFetchSize(cassandraConfiguration.getV1ReadFetchSize()))
                 .join())
             .map(this::fromRow);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
index 12dc844..a9447f9 100644
--- 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
+++ 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
@@ -98,6 +98,7 @@ public class CassandraSessionModule extends AbstractModule {
     private static final String MIGRATION_V1_V2_ON_THE_FLY = 
"migration.v1.v2.on.the.fly";
     private static final String MIGRATION_V1_V2_THREAD_COUNT = 
"migration.v1.v2.thread.count";
     private static final String MIGRATION_V1_V2_QUEUE_LENGTH = 
"migration.v1.v2.queue.length";
+    public static final String MIGRATION_V1_READ_SIZE = 
"migration.v1.read.fetch.size";
 
     @Override
     protected void configure() {
@@ -303,6 +304,8 @@ public class CassandraSessionModule extends AbstractModule {
                 
propertiesConfiguration.getInteger(MIGRATION_V1_V2_THREAD_COUNT, null)))
             .v1ToV2QueueLength(Optional.ofNullable(
                 
propertiesConfiguration.getInteger(MIGRATION_V1_V2_QUEUE_LENGTH, null)))
+            .v1ReadFetchSize(Optional.ofNullable(
+                propertiesConfiguration.getInteger(MIGRATION_V1_READ_SIZE, 
null)))
             .build();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
 
b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
index 0ae927c..7ccc8a8 100644
--- 
a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
+++ 
b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
@@ -63,6 +63,7 @@ public class CassandraSessionModuleTest {
                 .onTheFlyV1ToV2Migration(true)
                 .v1ToV2ThreadCount(11)
                 .v1ToV2QueueLength(12)
+                .v1ReadFetchSize(13)
                 .build());
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
----------------------------------------------------------------------
diff --git 
a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
 
b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
index a625b28..54ede24 100644
--- 
a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
+++ 
b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
@@ -11,3 +11,4 @@ mailbox.blob.part.size=10
 migration.v1.v2.on.the.fly=true
 migration.v1.v2.thread.count=11
 migration.v1.v2.queue.length=12
+migration.v1.read.fetch.size=13

http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/src/site/xdoc/server/config-cassandra.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/server/config-cassandra.xml 
b/src/site/xdoc/server/config-cassandra.xml
index fab44ec..2534ceb 100644
--- a/src/site/xdoc/server/config-cassandra.xml
+++ b/src/site/xdoc/server/config-cassandra.xml
@@ -119,6 +119,8 @@
         <dd>Optional. Defaults to 2.<br/> Controls the number of threads used 
to asynchronously migrate from v1 to v2.</dd>
         <dt><strong>migration.v1.v2.queue.length</strong></dt>
         <dd>Optional. Defaults to 1000.<br/> Controls the queue size of v1 to 
v2 migration task. Drops when full.</dd>
+        <dt><strong>migration.v1.read.fetch.size</strong></dt>
+        <dd>Optional. Defaults to 10.<br/> Controls the fetch size of the 
request to retrieve all messages stored in V1 during the migration process.</dd>
       </dl>
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to