Repository: james-project Updated Branches: refs/heads/master 9ed12378a -> ea823c645
JAMESâ2096 Add a fetch size to migration process V1 initial read Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d640a485 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d640a485 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d640a485 Branch: refs/heads/master Commit: d640a485b63889cf3beb3804e63a16e7699d3b72 Parents: 9ed1237 Author: benwa <btell...@linagora.com> Authored: Mon Jul 31 11:21:49 2017 +0700 Committer: benwa <btell...@linagora.com> Committed: Tue Aug 1 12:45:20 2017 +0700 ---------------------------------------------------------------------- .../cassandra/CassandraConfiguration.java | 30 +++++++++++++++++--- .../cassandra/CassandraConfigurationTest.java | 19 +++++++++++++ .../cassandra/mail/CassandraMessageDAO.java | 2 +- .../modules/mailbox/CassandraSessionModule.java | 3 ++ .../mailbox/CassandraSessionModuleTest.java | 1 + .../modules/mailbox/cassandra.properties | 1 + src/site/xdoc/server/config-cassandra.xml | 2 ++ 7 files changed, 53 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java index 5f0850b..5de74a4 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java @@ -42,6 +42,7 @@ public class CassandraConfiguration { public static final int DEFAULT_BLOB_PART_SIZE = 100 * 1024; public static final int DEFAULT_MIGRATION_V1_TO_V2_QUEUE_LENGTH = 1000; public static final int DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT = 2; + public static final int DEFAULT_MIGRATION_V1_READ_FETCH_SIZE = 10; public static class Builder { private Optional<Integer> messageReadChunkSize = Optional.empty(); @@ -57,6 +58,7 @@ public class CassandraConfiguration { private Optional<Boolean> onTheFlyV1ToV2Migration = Optional.empty(); private Optional<Integer> v1ToV2QueueLength = Optional.empty(); private Optional<Integer> v1ToV2ThreadCount = Optional.empty(); + private Optional<Integer> v1ReadFetchSize = Optional.empty(); public Builder messageReadChunkSize(int value) { Preconditions.checkArgument(value > 0, "messageReadChunkSize needs to be strictly positive"); @@ -130,6 +132,12 @@ public class CassandraConfiguration { return this; } + public Builder v1ReadFetchSize(int value) { + Preconditions.checkArgument(value > 0, "v1ReadFetchSize needs to be strictly positive"); + this.v1ReadFetchSize = Optional.of(value); + return this; + } + public Builder onTheFlyV1ToV2Migration(boolean value) { this.onTheFlyV1ToV2Migration = Optional.of(value); return this; @@ -200,6 +208,11 @@ public class CassandraConfiguration { return this; } + public Builder v1ReadFetchSize(Optional<Integer> value) { + value.ifPresent(this::v1ReadFetchSize); + return this; + } + public CassandraConfiguration build() { return new CassandraConfiguration(aclMaxRetry.orElse(DEFAULT_ACL_MAX_RETRY), messageReadChunkSize.orElse(DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ), @@ -213,7 +226,8 @@ public class CassandraConfiguration { blobPartSize.orElse(DEFAULT_BLOB_PART_SIZE), onTheFlyV1ToV2Migration.orElse(DEFAULT_ON_THE_FLY_MIGRATION_V1_TO_V2), v1ToV2QueueLength.orElse(DEFAULT_MIGRATION_V1_TO_V2_QUEUE_LENGTH), - v1ToV2ThreadCount.orElse(DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT)); + v1ToV2ThreadCount.orElse(DEFAULT_MIGRATION_V1_TO_V2_THREAD_COUNT), + v1ReadFetchSize.orElse(DEFAULT_MIGRATION_V1_READ_FETCH_SIZE)); } } @@ -234,13 +248,14 @@ public class CassandraConfiguration { private final boolean onTheFlyV1ToV2Migration; private final int v1ToV2QueueLength; private final int v1ToV2ThreadCount; + private final int v1ReadFetchSize; @VisibleForTesting CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize, int flagsUpdateChunkSize, int flagsUpdateMessageIdMaxRetry, int flagsUpdateMessageMaxRetry, int modSeqMaxRetry, int uidMaxRetry, int fetchNextPageInAdvanceRow, int blobPartSize, boolean onTheFlyV1ToV2Migration, int v1ToV2QueueLength, - int v1ToV2ThreadCount + int v1ToV2ThreadCount, int v1ReadFetchSize ) { this.aclMaxRetry = aclMaxRetry; this.messageReadChunkSize = messageReadChunkSize; @@ -255,6 +270,7 @@ public class CassandraConfiguration { this.onTheFlyV1ToV2Migration = onTheFlyV1ToV2Migration; this.v1ToV2QueueLength = v1ToV2QueueLength; this.v1ToV2ThreadCount = v1ToV2ThreadCount; + this.v1ReadFetchSize = v1ReadFetchSize; } public int getBlobPartSize() { @@ -309,6 +325,10 @@ public class CassandraConfiguration { return v1ToV2ThreadCount; } + public int getV1ReadFetchSize() { + return v1ReadFetchSize; + } + @Override public final boolean equals(Object o) { if (o instanceof CassandraConfiguration) { @@ -326,7 +346,8 @@ public class CassandraConfiguration { && Objects.equals(this.blobPartSize, that.blobPartSize) && Objects.equals(this.onTheFlyV1ToV2Migration, that.onTheFlyV1ToV2Migration) && Objects.equals(this.v1ToV2ThreadCount, that.v1ToV2ThreadCount) - && Objects.equals(this.v1ToV2QueueLength, that.v1ToV2QueueLength); + && Objects.equals(this.v1ToV2QueueLength, that.v1ToV2QueueLength) + && Objects.equals(this.v1ReadFetchSize, that.v1ReadFetchSize); } return false; } @@ -335,7 +356,7 @@ public class CassandraConfiguration { public final int hashCode() { return Objects.hash(aclMaxRetry, messageReadChunkSize, expungeChunkSize, flagsUpdateMessageIdMaxRetry, flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, fetchNextPageInAdvanceRow, flagsUpdateChunkSize, - blobPartSize, onTheFlyV1ToV2Migration, v1ToV2ThreadCount, v1ToV2QueueLength); + blobPartSize, onTheFlyV1ToV2Migration, v1ToV2ThreadCount, v1ToV2QueueLength, v1ReadFetchSize); } @Override @@ -354,6 +375,7 @@ public class CassandraConfiguration { .add("onTheFlyV1ToV2Migration", onTheFlyV1ToV2Migration) .add("v1ToV2ThreadCount", v1ToV2ThreadCount) .add("v1ToV2QueueLength", v1ToV2QueueLength) + .add("v1ReadFetchSize", v1ReadFetchSize) .toString(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java index e88d4a8..b38dbf8 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java @@ -224,6 +224,22 @@ public class CassandraConfigurationTest { } @Test + public void v1ReadFetchSizeShouldThrowOnNegative() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .v1ReadFetchSize(-1); + } + + @Test + public void v1ReadFetchSizeShouldThrowOnZero() { + expectedException.expect(IllegalArgumentException.class); + + CassandraConfiguration.builder() + .v1ToV2QueueLength(0); + } + + @Test public void builderShouldCreateTheRightObject() { int aclMaxRetry = 1; int modSeqMaxRetry = 2; @@ -238,6 +254,7 @@ public class CassandraConfigurationTest { boolean onTheFlyV1ToV2Migration = true; int v1ToV2ThreadCount = 11; int v1ToV2QueueLength = 12; + int v1ReadFetchSize = 13; CassandraConfiguration configuration = CassandraConfiguration.builder() .aclMaxRetry(aclMaxRetry) @@ -253,6 +270,7 @@ public class CassandraConfigurationTest { .onTheFlyV1ToV2Migration(onTheFlyV1ToV2Migration) .v1ToV2ThreadCount(v1ToV2ThreadCount) .v1ToV2QueueLength(v1ToV2QueueLength) + .v1ReadFetchSize(v1ReadFetchSize) .build(); softly.assertThat(configuration.getAclMaxRetry()).isEqualTo(aclMaxRetry); @@ -268,6 +286,7 @@ public class CassandraConfigurationTest { softly.assertThat(configuration.isOnTheFlyV1ToV2Migration()).isEqualTo(onTheFlyV1ToV2Migration); softly.assertThat(configuration.getV1ToV2ThreadCount()).isEqualTo(v1ToV2ThreadCount); softly.assertThat(configuration.getV1ToV2QueueLength()).isEqualTo(v1ToV2QueueLength); + softly.assertThat(configuration.getV1ReadFetchSize()).isEqualTo(v1ReadFetchSize); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index 78f6372..ba4802e 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -154,7 +154,7 @@ public class CassandraMessageDAO { public Stream<RawMessage> readAll() { return cassandraUtils.convertToStream( - cassandraAsyncExecutor.execute(selectAll.bind()) + cassandraAsyncExecutor.execute(selectAll.bind().setFetchSize(cassandraConfiguration.getV1ReadFetchSize())) .join()) .map(this::fromRow); } http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java index 12dc844..a9447f9 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java @@ -98,6 +98,7 @@ public class CassandraSessionModule extends AbstractModule { private static final String MIGRATION_V1_V2_ON_THE_FLY = "migration.v1.v2.on.the.fly"; private static final String MIGRATION_V1_V2_THREAD_COUNT = "migration.v1.v2.thread.count"; private static final String MIGRATION_V1_V2_QUEUE_LENGTH = "migration.v1.v2.queue.length"; + public static final String MIGRATION_V1_READ_SIZE = "migration.v1.read.fetch.size"; @Override protected void configure() { @@ -303,6 +304,8 @@ public class CassandraSessionModule extends AbstractModule { propertiesConfiguration.getInteger(MIGRATION_V1_V2_THREAD_COUNT, null))) .v1ToV2QueueLength(Optional.ofNullable( propertiesConfiguration.getInteger(MIGRATION_V1_V2_QUEUE_LENGTH, null))) + .v1ReadFetchSize(Optional.ofNullable( + propertiesConfiguration.getInteger(MIGRATION_V1_READ_SIZE, null))) .build(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java ---------------------------------------------------------------------- diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java index 0ae927c..7ccc8a8 100644 --- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java +++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java @@ -63,6 +63,7 @@ public class CassandraSessionModuleTest { .onTheFlyV1ToV2Migration(true) .v1ToV2ThreadCount(11) .v1ToV2QueueLength(12) + .v1ReadFetchSize(13) .build()); } http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties ---------------------------------------------------------------------- diff --git a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties index a625b28..54ede24 100644 --- a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties +++ b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties @@ -11,3 +11,4 @@ mailbox.blob.part.size=10 migration.v1.v2.on.the.fly=true migration.v1.v2.thread.count=11 migration.v1.v2.queue.length=12 +migration.v1.read.fetch.size=13 http://git-wip-us.apache.org/repos/asf/james-project/blob/d640a485/src/site/xdoc/server/config-cassandra.xml ---------------------------------------------------------------------- diff --git a/src/site/xdoc/server/config-cassandra.xml b/src/site/xdoc/server/config-cassandra.xml index fab44ec..2534ceb 100644 --- a/src/site/xdoc/server/config-cassandra.xml +++ b/src/site/xdoc/server/config-cassandra.xml @@ -119,6 +119,8 @@ <dd>Optional. Defaults to 2.<br/> Controls the number of threads used to asynchronously migrate from v1 to v2.</dd> <dt><strong>migration.v1.v2.queue.length</strong></dt> <dd>Optional. Defaults to 1000.<br/> Controls the queue size of v1 to v2 migration task. Drops when full.</dd> + <dt><strong>migration.v1.read.fetch.size</strong></dt> + <dd>Optional. Defaults to 10.<br/> Controls the fetch size of the request to retrieve all messages stored in V1 during the migration process.</dd> </dl> --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org