Makes consistency configurable
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/19d30eaf Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/19d30eaf Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/19d30eaf Branch: refs/heads/master Commit: 19d30eafc77095ee74ae126f9d0a849e997b6ad7 Parents: 0326629 Author: Todd Nine <[email protected]> Authored: Mon Oct 19 13:59:08 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Mon Oct 19 13:59:08 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 4 +--- .../map/impl/MapSerializationImpl.java | 21 ++++++++++---------- 2 files changed, 12 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/19d30eaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index 6b2eb45..67d0dab 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -29,9 +29,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; -import com.google.common.base.Optional; -import org.apache.usergrid.persistence.index.impl.IndexProducer; -import org.apache.usergrid.persistence.queue.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +65,7 @@ import org.apache.usergrid.persistence.map.impl.MapScopeImpl; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.apache.usergrid.persistence.queue.QueueFig; import org.apache.usergrid.persistence.queue.QueueManager; import org.apache.usergrid.persistence.queue.QueueManagerFactory; import org.apache.usergrid.persistence.queue.QueueMessage; http://git-wip-us.apache.org/repos/asf/usergrid/blob/19d30eaf/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java index 1aa3229..ffe10c9 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java @@ -33,6 +33,8 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey; import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer; +import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; +import org.apache.usergrid.persistence.core.astyanax.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer; import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily; import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition; @@ -105,15 +107,9 @@ public class MapSerializationImpl implements MapSerialization { /** * How to funnel keys for buckets */ - private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() { + private static final Funnel<String> MAP_KEY_FUNNEL = ( key, into ) -> into.putString( key, StringHashUtils.UTF8 ); - @Override - public void funnel( final String key, final PrimitiveSink into ) { - into.putString( key, StringHashUtils.UTF8 ); - } - }; - /** * Locator to get us all buckets */ @@ -121,10 +117,14 @@ public class MapSerializationImpl implements MapSerialization { new ExpandingShardLocator<>( MAP_KEY_FUNNEL, NUM_BUCKETS ); private final Keyspace keyspace; + private final CassandraConfig cassandraConfig; @Inject - public MapSerializationImpl( final Keyspace keyspace ) {this.keyspace = keyspace;} + public MapSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig ) { + this.keyspace = keyspace; + this.cassandraConfig = cassandraConfig; + } @Override @@ -387,7 +387,7 @@ public class MapSerializationImpl implements MapSerialization { //now get all columns, including the "old row key value" try { final Column<Boolean> result = - keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( ConsistencyLevel.CL_QUORUM ) + keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getConsistentReadCL() ) .getKey( entryRowKey ).getColumn( true ).execute().getResult(); return result; @@ -421,7 +421,8 @@ public class MapSerializationImpl implements MapSerialization { //now get all columns, including the "old row key value" try { final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows = - keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true ).execute() + keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKeySlice( + rowKeys ).withColumnSlice( true ).execute() .getResult();
