Adds strong consistency read to maps. Persists ES batches into Cassandra for multi region execution.
A bug in wiring JSON to SQS still exists, it's incorrectly escaping some message subtypes. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/94a90781 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/94a90781 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/94a90781 Branch: refs/heads/usergrid-1007-shiro-cache Commit: 94a9078125fc32d755e33e562f8e8fd8624641c1 Parents: 2b22c61 Author: Todd Nine <[email protected]> Authored: Fri Oct 16 18:02:44 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Fri Oct 16 18:02:44 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 186 +++++++++++--- .../asyncevents/AsyncEventService.java | 1 + .../asyncevents/AsyncIndexProvider.java | 22 +- .../asyncevents/model/AsyncEvent.java | 3 +- .../model/ElasticsearchIndexEvent.java | 50 ++++ .../index/IndexProcessorFig.java | 8 + .../util/ObjectJsonSerializer.java | 74 ++++++ .../index/AmazonAsyncEventServiceTest.java | 6 +- .../index/AsyncIndexServiceTest.java | 2 +- .../usergrid/persistence/map/MapManager.java | 25 +- .../persistence/map/impl/MapManagerImpl.java | 6 + .../persistence/map/impl/MapSerialization.java | 27 +- .../map/impl/MapSerializationImpl.java | 248 ++++++++++--------- .../index/impl/DeIndexOperation.java | 4 + .../persistence/index/impl/IndexOperation.java | 4 + .../index/impl/IndexOperationMessage.java | 5 + .../persistence/queue/DefaultQueueManager.java | 12 +- .../persistence/queue/QueueManager.java | 8 +- .../queue/impl/SNSQueueManagerImpl.java | 188 ++++++++++---- .../queue/impl/SQSQueueManagerImpl.java | 28 ++- .../services/queues/ImportQueueManager.java | 9 +- 21 files changed, 666 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index 95126c6..c9f0953 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -21,13 +21,20 @@ package org.apache.usergrid.corepersistence.asyncevents; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.base.Optional; + +import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer; +import org.apache.usergrid.exception.NotImplementedException; import org.apache.usergrid.persistence.index.impl.IndexProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +61,13 @@ import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexLocationStrategy; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.map.MapManager; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.map.MapScope; +import org.apache.usergrid.persistence.map.impl.MapScopeImpl; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.persistence.queue.QueueManager; import org.apache.usergrid.persistence.queue.QueueManagerFactory; import org.apache.usergrid.persistence.queue.QueueMessage; @@ -82,12 +94,13 @@ public class AmazonAsyncEventService implements AsyncEventService { private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class); + private static final ObjectJsonSerializer OBJECT_JSON_SERIALIZER = new ObjectJsonSerializer( ); + // SQS maximum receive messages is 10 private static final int MAX_TAKE = 10; public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars private final QueueManager queue; - private final QueueScope queueScope; private final IndexProcessorFig indexProcessorFig; private final IndexProducer indexProducer; private final EntityCollectionManagerFactory entityCollectionManagerFactory; @@ -109,6 +122,7 @@ public class AmazonAsyncEventService implements AsyncEventService { private final AtomicLong counter = new AtomicLong(); private final AtomicLong inFlight = new AtomicLong(); private final Histogram messageCycle; + private final MapManager esMapPersistence; //the actively running subscription private List<Subscription> subscriptions = new ArrayList<>(); @@ -123,6 +137,7 @@ public class AmazonAsyncEventService implements AsyncEventService { final IndexLocationStrategyFactory indexLocationStrategyFactory, final EntityIndexFactory entityIndexFactory, final EventBuilder eventBuilder, + final MapManagerFactory mapManagerFactory, final RxTaskScheduler rxTaskScheduler ) { this.indexProducer = indexProducer; @@ -130,10 +145,16 @@ public class AmazonAsyncEventService implements AsyncEventService { this.indexLocationStrategyFactory = indexLocationStrategyFactory; this.entityIndexFactory = entityIndexFactory; this.eventBuilder = eventBuilder; + + final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents"); + + this.esMapPersistence = mapManagerFactory.createMapManager( mapScope ); + this.rxTaskScheduler = rxTaskScheduler; - this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); + QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); this.queue = queueManagerFactory.getQueueManager(queueScope); + this.indexProcessorFig = indexProcessorFig; this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write"); @@ -158,7 +179,7 @@ public class AmazonAsyncEventService implements AsyncEventService { /** * Offer the EntityIdScope to SQS */ - private void offer(final Object operation) { + private void offer(final Serializable operation) { final Timer.Context timer = this.writeTimer.time(); try { @@ -213,7 +234,7 @@ public class AmazonAsyncEventService implements AsyncEventService { final Timer.Context timer = this.ackTimer.time(); try{ - queue.commitMessage(message); + queue.commitMessage( message ); //decrement our in-flight counter inFlight.decrementAndGet(); @@ -235,7 +256,7 @@ public class AmazonAsyncEventService implements AsyncEventService { final Timer.Context timer = this.ackTimer.time(); try{ - queue.commitMessages(messages); + queue.commitMessages( messages ); //decrement our in-flight counter inFlight.decrementAndGet(); @@ -296,7 +317,13 @@ public class AmazonAsyncEventService implements AsyncEventService { handleInitializeApplicationIndex(event, message); indexoperationObservable = Observable.just(new IndexOperationMessage()); validateEmptySets = false; //do not check this one for an empty set b/c it will be empty. - } else { + } else if (event instanceof ElasticsearchIndexEvent){ + handleIndexOperation( (ElasticsearchIndexEvent)event ); + indexoperationObservable = Observable.just( new IndexOperationMessage() ); + validateEmptySets = false; //do not check this one for an empty set b/c it will be empty. + } + + else { throw new Exception("Unknown EventType");//TODO: print json instead } @@ -434,6 +461,85 @@ public class AmazonAsyncEventService implements AsyncEventService { offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) ); } + + /** + * Queue up an indexOperationMessage for multi region execution + * @param indexOperationMessage + */ + public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) { + + final String jsonValue = OBJECT_JSON_SERIALIZER.toByteBuffer( indexOperationMessage ); + + final UUID newMessageId = UUIDGenerator.newTimeUUID(); + + //write to the map in ES + esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() ); + + + + //now queue up the index message + + final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId ); + + //send to the topic so all regions index the batch + try { + queue.sendMessageToTopic( elasticsearchIndexEvent ); + } + catch ( IOException e ) { + throw new RuntimeException( "Unable to pulish to topic", e ); + } + } + + public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){ + Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); + + final UUID messageId = elasticsearchIndexEvent.getIndexBatchId(); + + Preconditions.checkNotNull( messageId, "messageId must not be null" ); + + + //load the entity + + final String message = esMapPersistence.getString( messageId.toString() ); + + String highConsistency = null; + + if(message == null){ + logger.error( "Receive message with id {} to process, unable to find it, reading with higher consistency level" ); + + highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() ); + + } + + //read the value from the string + + final IndexOperationMessage indexOperationMessage; + + //our original local read has it, parse it. + if(message != null){ + indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( message, IndexOperationMessage.class ); + } + //we tried to read it at a higher consistency level and it works + else if (highConsistency != null){ + indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( highConsistency, IndexOperationMessage.class ); + } + + //we couldn't find it, bail + else{ + logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" ); + + throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId ); + } + + + + //now execute it + indexProducer.put(indexOperationMessage).toBlocking().last(); + + } + + + @Override public long getQueueDepth() { return queue.getQueueDepth(); @@ -510,71 +616,75 @@ public class AmazonAsyncEventService implements AsyncEventService { synchronized (mutex) { Observable<List<QueueMessage>> consumer = - Observable.create(new Observable.OnSubscribe<List<QueueMessage>>() { + Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() { @Override - public void call(final Subscriber<? super List<QueueMessage>> subscriber) { + public void call( final Subscriber<? super List<QueueMessage>> subscriber ) { //name our thread so it's easy to see - Thread.currentThread().setName("QueueConsumer_" + counter.incrementAndGet()); + Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() ); List<QueueMessage> drainList = null; do { try { - drainList = take().toList().toBlocking().lastOrDefault(null); + drainList = take().toList().toBlocking().lastOrDefault( null ); //emit our list in it's entity to hand off to a worker pool - subscriber.onNext(drainList); + subscriber.onNext( drainList ); //take since we're in flight - inFlight.addAndGet(drainList.size()); - } catch (Throwable t) { + inFlight.addAndGet( drainList.size() ); + } + catch ( Throwable t ) { final long sleepTime = indexProcessorFig.getFailureRetryTime(); - logger.error("Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t); + logger.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t ); - if (drainList != null) { - inFlight.addAndGet(-1 * drainList.size()); + if ( drainList != null ) { + inFlight.addAndGet( -1 * drainList.size() ); } try { - Thread.sleep(sleepTime); - } catch (InterruptedException ie) { + Thread.sleep( sleepTime ); + } + catch ( InterruptedException ie ) { //swallow } indexErrorCounter.inc(); } } - while (true); + while ( true ); } - }) + } ) //this won't block our read loop, just reads and proceeds - .map(messages -> - { - if (messages == null || messages.size() == 0) { + .map( messages -> { + if ( messages == null || messages.size() == 0 ) { return null; } try { - List<IndexEventResult> indexEventResults = callEventHandlers(messages); - List<QueueMessage> messagesToAck = submitToIndex(indexEventResults); - if (messagesToAck == null || messagesToAck.size() == 0) { - logger.error("No messages came back from the queue operation should have seen "+messages.size(),messages); + List<IndexEventResult> indexEventResults = callEventHandlers( messages ); + List<QueueMessage> messagesToAck = submitToIndex( indexEventResults ); + if ( messagesToAck == null || messagesToAck.size() == 0 ) { + logger.error( "No messages came back from the queue operation should have seen " + + messages.size(), messages ); return messagesToAck; } - if(messagesToAck.size()<messages.size()){ - logger.error("Missing messages from queue post operation",messages,messagesToAck); + if ( messagesToAck.size() < messages.size() ) { + logger.error( "Missing messages from queue post operation", messages, + messagesToAck ); } //ack each message, but only if we didn't error. - ack(messagesToAck); + ack( messagesToAck ); return messagesToAck; - } catch (Exception e) { - logger.error("failed to ack messages to sqs", e); + } + catch ( Exception e ) { + logger.error( "failed to ack messages to sqs", e ); return null; //do not rethrow so we can process all of them } - }); + } ); //start in the background @@ -619,12 +729,8 @@ public class AmazonAsyncEventService implements AsyncEventService { //send the batch //TODO: should retry? - try { - indexProducer.put(combined).toBlocking().lastOrDefault(null); - }catch (Exception e){ - logger.error("Failed to submit to index producer",e); - throw e; - } + queueIndexOperationMessage( combined ); + return messagesToAck; } @@ -671,4 +777,6 @@ public class AmazonAsyncEventService implements AsyncEventService { return creationTime; } } + + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index ae5688c..dcfffcb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.entities.Application; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.index.IndexLocationStrategy; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index e9e36f0..3865ecb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.IndexProducer; +import org.apache.usergrid.persistence.map.MapManagerFactory; import org.apache.usergrid.persistence.queue.QueueManagerFactory; import com.google.inject.Inject; @@ -51,20 +52,18 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { private final IndexLocationStrategyFactory indexLocationStrategyFactory; private final EntityIndexFactory entityIndexFactory; private final IndexProducer indexProducer; + private final MapManagerFactory mapManagerFactory; private AsyncEventService asyncEventService; @Inject - public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig, - final QueueManagerFactory queueManagerFactory, - final MetricsFactory metricsFactory, - final RxTaskScheduler rxTaskScheduler, - final EntityCollectionManagerFactory entityCollectionManagerFactory, - final EventBuilder eventBuilder, - final IndexLocationStrategyFactory indexLocationStrategyFactory, - final EntityIndexFactory entityIndexFactory, - final IndexProducer indexProducer) { + public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory, + final MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler, final + EntityCollectionManagerFactory entityCollectionManagerFactory, + final EventBuilder eventBuilder, final IndexLocationStrategyFactory indexLocationStrategyFactory, + final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer, + final MapManagerFactory mapManagerFactory ) { this.indexProcessorFig = indexProcessorFig; this.queueManagerFactory = queueManagerFactory; @@ -75,6 +74,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { this.indexLocationStrategyFactory = indexLocationStrategyFactory; this.entityIndexFactory = entityIndexFactory; this.indexProducer = indexProducer; + this.mapManagerFactory = mapManagerFactory; } @@ -99,10 +99,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously()); case SQS: return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler ); + entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler ); case SNS: return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler ); + entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler ); default: throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java index 6b45297..1af54e3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java @@ -39,7 +39,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ), @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ), @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ), - @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ) + @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ), + @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" ) } ) public abstract class AsyncEvent implements Serializable { http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java new file mode 100644 index 0000000..207b15e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents.model; + + +import java.util.UUID; + +import com.fasterxml.jackson.annotation.JsonProperty; + + +/** + * An index event for publishing to elastic search + */ +public final class ElasticsearchIndexEvent extends AsyncEvent { + + + @JsonProperty + protected UUID indexBatchId; + + public ElasticsearchIndexEvent() { + } + + public ElasticsearchIndexEvent( UUID indexBatchId ) { + this.indexBatchId = indexBatchId; + } + + + /** + * Get the unique message id of the + * @return + */ + public UUID getIndexBatchId() { + return indexBatchId; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index 7d022e5..6fd73b4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -103,4 +103,12 @@ public interface IndexProcessorFig extends GuicyFig { @Default("false") @Key("elasticsearch.queue_impl.resolution") boolean resolveSynchronously(); + + /** + * Get the message TTL in milliseconds + * @return + */ + @Default("604800000") + @Key( "elasticsearch.message.ttl" ) + int getIndexMessageTtl(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java new file mode 100644 index 0000000..dbd5ca3 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.util; + + +import java.io.IOException; +import java.io.Serializable; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; + + +/** + * An utility class to serialize and de-serialized objects as json strings + */ +public final class ObjectJsonSerializer { + + + private final JsonFactory JSON_FACTORY = new JsonFactory(); + + private final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY ); + + public ObjectJsonSerializer( ) { + MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" ); + } + + + public <T extends Serializable> String toByteBuffer( final T toSerialize ) { + + Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" ); + final String stringValue; + //mark this version as empty + + //Convert to internal entity map + try { + stringValue = MAPPER.writeValueAsString( toSerialize ); + } + catch ( JsonProcessingException jpe ) { + throw new RuntimeException( "Unable to serialize entity", jpe ); + } + + return stringValue; + } + + + public <T extends Serializable> T fromString( final String value, final Class<T> toSerialize ) { + + Preconditions.checkNotNull( value, "value must not be null" ); + + try { + return MAPPER.readValue( value, toSerialize ); + } + catch ( IOException e ) { + throw new RuntimeException( "Unable to deserialize", e ); + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java index a14437c..e83d6f8 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java @@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.test.UseModules; import org.apache.usergrid.persistence.index.impl.EsRunner; +import org.apache.usergrid.persistence.map.MapManagerFactory; import org.apache.usergrid.persistence.queue.QueueManagerFactory; import com.google.inject.Inject; @@ -79,13 +80,16 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest { @Inject public IndexLocationStrategyFactory indexLocationStrategyFactory; + @Inject + public MapManagerFactory mapManagerFactory; + @Inject public EntityIndexFactory entityIndexFactory; @Override protected AsyncEventService getAsyncEventService() { - return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler ); + return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java index d34a1a9..2863cbf 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java @@ -189,7 +189,7 @@ public abstract class AsyncIndexServiceTest { } try { - Thread.sleep( 100 ); + Thread.sleep( 10000 ); } catch ( InterruptedException e ) { //swallow http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java index c280fee..80e2d17 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java @@ -33,7 +33,14 @@ public interface MapManager { /** * Return the string, null if not found */ - public String getString( final String key ); + String getString( final String key ); + + /** + * Read the string at a high consistency level. This should ensure data replication has happened + * @param key + * @return + */ + String getStringHighConsistency(final String key); /** @@ -41,12 +48,12 @@ public interface MapManager { * @param keys * @return */ - public Map<String, String> getStrings(final Collection<String> keys); + Map<String, String> getStrings( final Collection<String> keys ); /** * Return the string, null if not found */ - public void putString( final String key, final String value ); + void putString( final String key, final String value ); /** * The time to live (in seconds) of the string @@ -54,33 +61,33 @@ public interface MapManager { * @param value * @param ttl */ - public void putString( final String key, final String value, final int ttl ); + void putString( final String key, final String value, final int ttl ); /** * Return the uuid, null if not found */ - public UUID getUuid( final String key ); + UUID getUuid( final String key ); /** * Return the uuid, null if not found */ - public void putUuid( final String key, final UUID putUuid ); + void putUuid( final String key, final UUID putUuid ); /** * Return the long, null if not found */ - public Long getLong( final String key ); + Long getLong( final String key ); /** * Return the long, null if not found */ - public void putLong( final String key, final Long value ); + void putLong( final String key, final Long value ); /** * Delete the key * * @param key The key used to delete the entry */ - public void delete( final String key ); + void delete( final String key ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java index fb2e7ff..501ade7 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java @@ -53,6 +53,12 @@ public class MapManagerImpl implements MapManager { @Override + public String getStringHighConsistency( final String key ) { + return mapSerialization.getStringHighConsistency(scope, key); + } + + + @Override public Map<String, String> getStrings( final Collection<String> keys ) { return mapSerialization.getStrings( scope, keys ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java index 2e958c2..e9c21d2 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java @@ -32,50 +32,59 @@ public interface MapSerialization extends Migration { /** * Return the string, null if not found */ - public String getString( final MapScope scope, final String key ); + String getString( final MapScope scope, final String key ); + + + /** + * Get the key from all regions with a high consistency + * @param scope + * @param key + * @return + */ + String getStringHighConsistency( final MapScope scope, final String key ); /** * Get strings from the map * @param keys * @return */ - public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys ); + Map<String, String> getStrings( final MapScope scope, final Collection<String> keys ); /** * Return the string, null if not found */ - public void putString( final MapScope scope, final String key, final String value ); + void putString( final MapScope scope, final String key, final String value ); /** * Write the string */ - public void putString( final MapScope scope, final String key, final String value, final int ttl ); + void putString( final MapScope scope, final String key, final String value, final int ttl ); /** * Return the uuid, null if not found */ - public UUID getUuid( final MapScope scope, final String key ); + UUID getUuid( final MapScope scope, final String key ); /** * Return the uuid, null if not found */ - public void putUuid( final MapScope scope, final String key, final UUID putUuid ); + void putUuid( final MapScope scope, final String key, final UUID putUuid ); /** * Return the long, null if not found */ - public Long getLong( final MapScope scope, final String key ); + Long getLong( final MapScope scope, final String key ); /** * Return the long, null if not found */ - public void putLong( final MapScope scope, final String key, final Long value ); + void putLong( final MapScope scope, final String key, final Long value ); /** * Delete the key * * @param key The key used to delete the entry */ - public void delete( final MapScope scope, final String key ); + void delete( final MapScope scope, final String key ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java index 825d636..1aa3229 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java @@ -18,6 +18,8 @@ */ package org.apache.usergrid.persistence.map.impl; + + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -26,21 +28,21 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import com.google.common.base.Preconditions; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey; +import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer; import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer; import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily; import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition; -import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey; -import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer; -import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; +import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer; import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator; import org.apache.usergrid.persistence.core.shard.StringHashUtils; import org.apache.usergrid.persistence.map.MapScope; +import com.google.common.base.Preconditions; import com.google.common.hash.Funnel; import com.google.common.hash.PrimitiveSink; import com.google.inject.Inject; @@ -53,9 +55,9 @@ import com.netflix.astyanax.connectionpool.exceptions.NotFoundException; import com.netflix.astyanax.model.Column; import com.netflix.astyanax.model.CompositeBuilder; import com.netflix.astyanax.model.CompositeParser; +import com.netflix.astyanax.model.ConsistencyLevel; import com.netflix.astyanax.model.Row; import com.netflix.astyanax.model.Rows; -import com.netflix.astyanax.query.ColumnFamilyQuery; import com.netflix.astyanax.serializers.BooleanSerializer; import com.netflix.astyanax.serializers.StringSerializer; @@ -65,41 +67,40 @@ public class MapSerializationImpl implements MapSerialization { private static final MapKeySerializer KEY_SERIALIZER = new MapKeySerializer(); - private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER = - new BucketScopedRowKeySerializer<>( KEY_SERIALIZER ); + private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER = + new BucketScopedRowKeySerializer<>( KEY_SERIALIZER ); - private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer(); - private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER = - new ScopedRowKeySerializer<>( ENTRY_SERIALIZER ); + private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer(); + private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER = + new ScopedRowKeySerializer<>( ENTRY_SERIALIZER ); - private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get(); + private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get(); - private static final StringSerializer STRING_SERIALIZER = StringSerializer.get(); + private static final StringSerializer STRING_SERIALIZER = StringSerializer.get(); private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder(); - /** - * CFs where the row key contains the source node id - */ - public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean> - MAP_ENTRIES = new MultiTennantColumnFamily<>( - "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER ); + /** + * CFs where the row key contains the source node id + */ + public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean> MAP_ENTRIES = + new MultiTennantColumnFamily<>( "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER ); - /** - * CFs where the row key contains the source node id - */ - public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS = - new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER ); + /** + * CFs where the row key contains the source node id + */ + public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS = + new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER ); /** * Number of buckets to hash across. */ - private static final int[] NUM_BUCKETS = {20}; + private static final int[] NUM_BUCKETS = { 20 }; /** * How to funnel keys for buckets @@ -107,7 +108,6 @@ public class MapSerializationImpl implements MapSerialization { private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() { - @Override public void funnel( final String key, final PrimitiveSink into ) { into.putString( key, StringHashUtils.UTF8 ); @@ -117,8 +117,8 @@ public class MapSerializationImpl implements MapSerialization { /** * Locator to get us all buckets */ - private static final ExpandingShardLocator<String> - BUCKET_LOCATOR = new ExpandingShardLocator<>(MAP_KEY_FUNNEL, NUM_BUCKETS); + private static final ExpandingShardLocator<String> BUCKET_LOCATOR = + new ExpandingShardLocator<>( MAP_KEY_FUNNEL, NUM_BUCKETS ); private final Keyspace keyspace; @@ -129,13 +129,20 @@ public class MapSerializationImpl implements MapSerialization { @Override public String getString( final MapScope scope, final String key ) { - Column<Boolean> col = getValue(scope, key); // TODO: why boolean? - return (col !=null) ? col.getStringValue(): null; + Column<Boolean> col = getValue( scope, key ); + return ( col != null ) ? col.getStringValue() : null; } @Override - public Map<String, String> getStrings(final MapScope scope, final Collection<String> keys ) { + public String getStringHighConsistency( final MapScope scope, final String key ) { + Column<Boolean> col = getValueHighConsistency( scope, key ); // TODO: why boolean? + return ( col != null ) ? col.getStringValue() : null; + } + + + @Override + public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys ) { return getValues( scope, keys, STRING_RESULTS_BUILDER ); } @@ -144,13 +151,13 @@ public class MapSerializationImpl implements MapSerialization { public void putString( final MapScope scope, final String key, final String value ) { final RowOp op = new RowOp() { @Override - public void putValue(final ColumnListMutation<Boolean> columnListMutation ) { + public void putValue( final ColumnListMutation<Boolean> columnListMutation ) { columnListMutation.putColumn( true, value ); } @Override - public void putKey(final ColumnListMutation<String> keysMutation ) { + public void putKey( final ColumnListMutation<String> keysMutation ) { keysMutation.putColumn( key, true ); } }; @@ -184,10 +191,6 @@ public class MapSerializationImpl implements MapSerialization { /** * Write our string index with the specified row op - * @param scope - * @param key - * @param value - * @param rowOp */ private void writeString( final MapScope scope, final String key, final String value, final RowOp rowOp ) { @@ -225,10 +228,11 @@ public class MapSerializationImpl implements MapSerialization { /** * Callbacks for performing row operations */ - private static interface RowOp{ + private static interface RowOp { /** * Callback to do the row + * * @param columnListMutation The column mutation */ void putValue( final ColumnListMutation<Boolean> columnListMutation ); @@ -236,104 +240,97 @@ public class MapSerializationImpl implements MapSerialization { /** * Write the key - * @param keysMutation */ void putKey( final ColumnListMutation<String> keysMutation ); - - } + @Override public UUID getUuid( final MapScope scope, final String key ) { - Column<Boolean> col = getValue(scope, key); - return (col !=null) ? col.getUUIDValue(): null; + Column<Boolean> col = getValue( scope, key ); + return ( col != null ) ? col.getUUIDValue() : null; } @Override public void putUuid( final MapScope scope, final String key, final UUID putUuid ) { - Preconditions.checkNotNull(scope, "mapscope is required"); + Preconditions.checkNotNull( scope, "mapscope is required" ); Preconditions.checkNotNull( key, "key is required" ); Preconditions.checkNotNull( putUuid, "value is required" ); final MutationBatch batch = keyspace.prepareMutationBatch(); //add it to the entry - final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key); + final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); //serialize to the entry - batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, putUuid); + batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, putUuid ); //add it to the keys final int bucket = BUCKET_LOCATOR.getCurrentBucket( key ); - final BucketScopedRowKey< String> keyRowKey = - BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket); + final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket ); //serialize to the entry - batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true); - - executeBatch(batch); + batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true ); + executeBatch( batch ); } @Override public Long getLong( final MapScope scope, final String key ) { - Column<Boolean> col = getValue(scope, key); - return (col !=null) ? col.getLongValue(): null; + Column<Boolean> col = getValue( scope, key ); + return ( col != null ) ? col.getLongValue() : null; } - - @Override public void putLong( final MapScope scope, final String key, final Long value ) { - Preconditions.checkNotNull(scope, "mapscope is required"); + Preconditions.checkNotNull( scope, "mapscope is required" ); Preconditions.checkNotNull( key, "key is required" ); Preconditions.checkNotNull( value, "value is required" ); final MutationBatch batch = keyspace.prepareMutationBatch(); //add it to the entry - final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key); + final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); //serialize to the entry - batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, value); + batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, value ); //add it to the keys final int bucket = BUCKET_LOCATOR.getCurrentBucket( key ); - final BucketScopedRowKey< String> keyRowKey = - BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket); + final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket ); //serialize to the entry - batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true); + batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true ); - executeBatch(batch); + executeBatch( batch ); } @Override public void delete( final MapScope scope, final String key ) { final MutationBatch batch = keyspace.prepareMutationBatch(); - final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key); + final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); //serialize to the entry - batch.withRow(MAP_ENTRIES, entryRowKey).delete(); + batch.withRow( MAP_ENTRIES, entryRowKey ).delete(); //add it to the keys, we're not sure which one it may have come from - final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key ); + final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key ); - final List<BucketScopedRowKey<String>> - rowKeys = BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets ); + final List<BucketScopedRowKey<String>> rowKeys = + BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets ); - for(BucketScopedRowKey<String> rowKey: rowKeys) { + for ( BucketScopedRowKey<String> rowKey : rowKeys ) { batch.withRow( MAP_KEYS, rowKey ).deleteColumn( key ); } @@ -345,34 +342,53 @@ public class MapSerializationImpl implements MapSerialization { public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() { final MultiTennantColumnFamilyDefinition mapEntries = - new MultiTennantColumnFamilyDefinition( MAP_ENTRIES, - BytesType.class.getSimpleName(), - BytesType.class.getSimpleName(), - BytesType.class.getSimpleName(), - MultiTennantColumnFamilyDefinition.CacheOption.KEYS ); + new MultiTennantColumnFamilyDefinition( MAP_ENTRIES, BytesType.class.getSimpleName(), + BytesType.class.getSimpleName(), BytesType.class.getSimpleName(), + MultiTennantColumnFamilyDefinition.CacheOption.KEYS ); final MultiTennantColumnFamilyDefinition mapKeys = - new MultiTennantColumnFamilyDefinition( MAP_KEYS, - BytesType.class.getSimpleName(), - UTF8Type.class.getSimpleName(), - BytesType.class.getSimpleName(), - MultiTennantColumnFamilyDefinition.CacheOption.KEYS ); + new MultiTennantColumnFamilyDefinition( MAP_KEYS, BytesType.class.getSimpleName(), + UTF8Type.class.getSimpleName(), BytesType.class.getSimpleName(), + MultiTennantColumnFamilyDefinition.CacheOption.KEYS ); return Arrays.asList( mapEntries, mapKeys ); } - private Column<Boolean> getValue(MapScope scope, String key) { + private Column<Boolean> getValue( MapScope scope, String key ) { + + + //add it to the entry + final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); + + //now get all columns, including the "old row key value" + try { + final Column<Boolean> result = + keyspace.prepareQuery( MAP_ENTRIES ).getKey( entryRowKey ).getColumn( true ).execute().getResult(); + + return result; + } + catch ( NotFoundException nfe ) { + //nothing to return + return null; + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to connect to cassandra", e ); + } + } + + private Column<Boolean> getValueHighConsistency( MapScope scope, String key ) { //add it to the entry - final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key); + final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); //now get all columns, including the "old row key value" try { - final Column<Boolean> result = keyspace.prepareQuery( MAP_ENTRIES ) - .getKey( entryRowKey ).getColumn( true ).execute().getResult(); + final Column<Boolean> result = + keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( ConsistencyLevel.CL_QUORUM ) + .getKey( entryRowKey ).getColumn( true ).execute().getResult(); return result; } @@ -388,52 +404,45 @@ public class MapSerializationImpl implements MapSerialization { /** * Get multiple values, using the string builder - * @param scope - * @param keys - * @param builder - * @param <T> - * @return */ - private <T> T getValues(final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder) { + private <T> T getValues( final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder ) { final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() ); - for(final String key: keys){ - //add it to the entry - final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key); + for ( final String key : keys ) { + //add it to the entry + final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); rowKeys.add( entryRowKey ); - } + //now get all columns, including the "old row key value" + try { + final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows = + keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true ).execute() + .getResult(); - //now get all columns, including the "old row key value" - try { - final Rows<ScopedRowKey<MapEntryKey>, Boolean> - rows = keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true ) - .execute().getResult(); - - - return builder.buildResults( rows ); - } - catch ( NotFoundException nfe ) { - //nothing to return - return null; - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to connect to cassandra", e ); - } - } + return builder.buildResults( rows ); + } + catch ( NotFoundException nfe ) { + //nothing to return + return null; + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to connect to cassandra", e ); + } + } - private void executeBatch(MutationBatch batch) { + private void executeBatch( MutationBatch batch ) { try { batch.execute(); - } catch (ConnectionException e) { - throw new RuntimeException("Unable to connect to cassandra", e); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to connect to cassandra", e ); } } @@ -501,8 +510,7 @@ public class MapSerializationImpl implements MapSerialization { /** * Create a scoped row key from the key */ - public static ScopedRowKey<MapEntryKey> fromKey( - final MapScope mapScope, final String key ) { + public static ScopedRowKey<MapEntryKey> fromKey( final MapScope mapScope, final String key ) { return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) ); } @@ -511,32 +519,32 @@ public class MapSerializationImpl implements MapSerialization { /** * Build the results from the row keys - * @param <T> */ private static interface ResultsBuilder<T> { - public T buildResults(final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows); + public T buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ); } - public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>>{ + + public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>> { @Override public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) { final int size = rows.size(); - final Map<String, String> results = new HashMap<>(size); + final Map<String, String> results = new HashMap<>( size ); - for(int i = 0; i < size; i ++){ + for ( int i = 0; i < size; i++ ) { final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i ); final String value = row.getColumns().getStringValue( true, null ); - if(value == null){ + if ( value == null ) { continue; } - results.put( row.getKey().getKey().key, value ); + results.put( row.getKey().getKey().key, value ); } return results; http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java index 4f47749..4060dac 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java @@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.index.SearchEdge; import org.apache.usergrid.persistence.model.entity.Id; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeInfo; import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId; @@ -42,7 +43,10 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createInd @JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class" ) public class DeIndexOperation implements BatchOperation { + @JsonProperty private String[] indexes; + + @JsonProperty private String documentId; http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java index fae809f..28f2e0d 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; +import com.fasterxml.jackson.annotation.JsonProperty; /** @@ -37,9 +38,12 @@ import org.elasticsearch.client.Client; */ public class IndexOperation implements BatchOperation { + @JsonProperty public String writeAlias; + @JsonProperty public String documentId; + @JsonProperty public Map<String, Object> data; public IndexOperation( final String writeAlias, final ApplicationScope applicationScope, IndexEdge indexEdge, http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java index 12df390..bcee308 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.stream.Collectors; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; @@ -33,9 +34,13 @@ import com.google.common.base.Optional; * Container for index operations. */ public class IndexOperationMessage implements Serializable { + @JsonProperty private final Set<IndexOperation> indexRequests; + + @JsonProperty private final Set<DeIndexOperation> deIndexRequests; + @JsonProperty private long creationTime; http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java index d974529..5201279 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java @@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.queue; import rx.Observable; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -67,12 +68,21 @@ public class DefaultQueueManager implements QueueManager { } } + @Override - public synchronized void sendMessage(Object body) throws IOException { + public <T extends Serializable> void sendMessage( final T body ) throws IOException { String uuid = UUID.randomUUID().toString(); queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here")); + } + + @Override + public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { + sendMessage( body ); + } + + @Override public void deleteQueue() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java index 027abb2..dc3d1b5 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java @@ -68,7 +68,13 @@ public interface QueueManager { * @param body * @throws IOException */ - void sendMessage(Object body)throws IOException; + <T extends Serializable> void sendMessage(T body)throws IOException; + + /** + * Send a messae to the topic to be sent to other queues + * @param body + */ + <T extends Serializable> void sendMessageToTopic(T body) throws IOException; /** * purge messages http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index d476f76..59ecd24 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -18,15 +18,55 @@ package org.apache.usergrid.persistence.queue.impl; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.core.astyanax.CassandraFig; +import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory; +import org.apache.usergrid.persistence.core.guicyfig.ClusterFig; +import org.apache.usergrid.persistence.queue.Queue; +import org.apache.usergrid.persistence.queue.QueueFig; +import org.apache.usergrid.persistence.queue.QueueManager; +import org.apache.usergrid.persistence.queue.QueueMessage; +import org.apache.usergrid.persistence.queue.QueueScope; +import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils; + import com.amazonaws.AmazonServiceException; import com.amazonaws.handlers.AsyncHandler; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.sns.AmazonSNSAsyncClient; import com.amazonaws.services.sns.AmazonSNSClient; -import com.amazonaws.services.sns.model.*; +import com.amazonaws.services.sns.model.PublishRequest; +import com.amazonaws.services.sns.model.PublishResult; +import com.amazonaws.services.sns.model.SubscribeRequest; +import com.amazonaws.services.sns.model.SubscribeResult; +import com.amazonaws.services.sqs.AmazonSQSAsyncClient; import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.*; +import com.amazonaws.services.sqs.model.BatchResultErrorEntry; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; +import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; +import com.amazonaws.services.sqs.model.DeleteMessageRequest; +import com.amazonaws.services.sqs.model.DeleteQueueRequest; +import com.amazonaws.services.sqs.model.GetQueueAttributesResult; +import com.amazonaws.services.sqs.model.GetQueueUrlResult; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.QueueDoesNotExistException; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -36,20 +76,6 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import org.apache.usergrid.persistence.core.astyanax.CassandraFig; -import org.apache.usergrid.persistence.core.guicyfig.ClusterFig; -import org.apache.usergrid.persistence.queue.*; -import org.apache.usergrid.persistence.queue.Queue; -import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils; -import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; public class SNSQueueManagerImpl implements QueueManager { @@ -59,10 +85,10 @@ public class SNSQueueManagerImpl implements QueueManager { private final QueueFig fig; private final ClusterFig clusterFig; private final CassandraFig cassandraFig; - private final QueueFig queueFig; private final AmazonSQSClient sqs; private final AmazonSNSClient sns; private final AmazonSNSAsyncClient snsAsync; + private final AmazonSQSAsyncClient sqsAsync; private final JsonFactory JSON_FACTORY = new JsonFactory(); @@ -110,6 +136,7 @@ public class SNSQueueManagerImpl implements QueueManager { }); + @Inject public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig, CassandraFig cassandraFig, QueueFig queueFig) { @@ -117,12 +144,21 @@ public class SNSQueueManagerImpl implements QueueManager { this.fig = fig; this.clusterFig = clusterFig; this.cassandraFig = cassandraFig; - this.queueFig = queueFig; + + + // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks + final ExecutorService executor = TaskExecutorFactory + .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(), + TaskExecutorFactory.RejectionAction.CALLERRUNS); + + + final Region region = getRegion(); try { - sqs = createSQSClient(getRegion()); - sns = createSNSClient(getRegion()); - snsAsync = createAsyncSNSClient(getRegion()); + sqs = createSQSClient(region); + sns = createSNSClient(region); + snsAsync = createAsyncSNSClient(region, executor); + sqsAsync = createAsyncSQSClient( region, executor ); } catch (Exception e) { throw new RuntimeException("Error setting up mapper", e); @@ -157,7 +193,7 @@ public class SNSQueueManagerImpl implements QueueManager { try { SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn); - sns.subscribe(primarySubscribeRequest); + sns.subscribe(primarySubscribeRequest); // ensure the SNS primary topic has permission to send to the primary SQS queue List<String> primaryTopicArnList = new ArrayList<>(); @@ -276,22 +312,35 @@ public class SNSQueueManagerImpl implements QueueManager { * */ - private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) { + private AmazonSNSAsyncClient createAsyncSNSClient(final Region region, final ExecutorService executor) { final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks - final Executor executor = TaskExecutorFactory - .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(), - TaskExecutorFactory.RejectionAction.CALLERRUNS); - - final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), (ExecutorService) executor); + final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), executor); sns.setRegion(region); return sns; } + + /** + * Create the async sqs client + * @param region + * @param executor + * @return + */ + private AmazonSQSAsyncClient createAsyncSQSClient(final Region region, final ExecutorService executor){ + final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); + + final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor ); + + sqs.setRegion( region ); + + return sqs; + + } + /** * The Synchronous SNS client is used for creating topics and subscribing queues. * @@ -369,7 +418,12 @@ public class SNSQueueManagerImpl implements QueueManager { try { final JsonNode bodyNode = mapper.readTree(message.getBody()); JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode; - body = fromString(bodyObj.textValue(), klass); + + + + final String bodyText = mapper.writeValueAsString( bodyObj );; + + body = fromString(bodyText, klass); } catch (Exception e) { logger.error(String.format("failed to deserialize message: %s", message.getBody()), e); throw new RuntimeException(e); @@ -405,6 +459,40 @@ public class SNSQueueManagerImpl implements QueueManager { } } + + @Override + public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { + if (snsAsync == null) { + logger.error("SNS client is null, perhaps it failed to initialize successfully"); + return; + } + + final String stringBody = toString(body); + + String topicArn = getWriteTopicArn(); + + if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn); + + PublishRequest publishRequest = new PublishRequest(topicArn, stringBody); + + snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() { + @Override + public void onError( Exception e ) { + logger.error( "Error publishing message... {}", e ); + } + + + @Override + public void onSuccess( PublishRequest request, PublishResult result ) { + if ( logger.isDebugEnabled() ) logger + .debug( "Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), + request.getTopicArn() ); + } + } ); + + } + + @Override public void sendMessages(final List bodies) throws IOException { @@ -414,41 +502,47 @@ public class SNSQueueManagerImpl implements QueueManager { } for (Object body : bodies) { - sendMessage(body); + sendMessage((Serializable)body); } } + @Override - public void sendMessage(final Object body) throws IOException { + public <T extends Serializable> void sendMessage( final T body ) throws IOException { - if (snsAsync == null) { - logger.error("SNS client is null, perhaps it failed to initialize successfully"); + if ( snsAsync == null ) { + logger.error( "SNS client is null, perhaps it failed to initialize successfully" ); return; } - final String stringBody = toString(body); + final String stringBody = toString( body ); - String topicArn = getWriteTopicArn(); + String url = getReadQueue().getUrl(); - if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn); + if ( logger.isDebugEnabled() ) { + logger.debug( "Publishing Message...{} to url: {}", stringBody, url ); + } - PublishRequest publishRequest = new PublishRequest(topicArn, stringBody); + SendMessageRequest request = new SendMessageRequest( url, stringBody ); - snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() { - @Override - public void onError(Exception e) { - logger.error("Error publishing message... {}", e); - } + sqsAsync.sendMessageAsync( request, new AsyncHandler<SendMessageRequest, SendMessageResult>() { @Override - public void onSuccess(PublishRequest request, PublishResult result) { - if (logger.isDebugEnabled()) - logger.debug("Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), request.getTopicArn()); + public void onError( final Exception e ) { + logger.error( "Error sending message... {}", e ); } - }); + + @Override + public void onSuccess( final SendMessageRequest request, final SendMessageResult sendMessageResult ) { + if ( logger.isDebugEnabled() ) { + logger.debug( "Successfully send... messageBody=[{}], url=[{}]", request.getMessageBody(), + request.getQueueUrl() ); + } + } + } ); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java index fa9a7ac..0c56c05 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java @@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.queue.impl; import java.io.IOException; +import java.io.Serializable; import java.util.*; import java.util.concurrent.ExecutionException; @@ -243,25 +244,34 @@ public class SQSQueueManagerImpl implements QueueManager { } + @Override - public void sendMessage(final Object body) throws IOException { + public <T extends Serializable> void sendMessage( final T body ) throws IOException { if (sqs == null) { - logger.error("Sqs is null"); - return; - } + logger.error("Sqs is null"); + return; + } - String url = getQueue().getUrl(); + String url = getQueue().getUrl(); - if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url); + if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url); - final String stringBody = toString(body); + final String stringBody = toString(body); - SendMessageRequest request = new SendMessageRequest(url, stringBody); - sqs.sendMessage(request); + SendMessageRequest request = new SendMessageRequest(url, stringBody); + sqs.sendMessage(request); + } + + + + @Override + public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { + sendMessage( body ); } + @Override public void commitMessage(final QueueMessage queueMessage) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java index bca9a49..bc55ff4 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java @@ -22,6 +22,7 @@ package org.apache.usergrid.services.queues; import java.io.IOException; +import java.io.Serializable; import java.util.List; import org.apache.usergrid.persistence.queue.QueueManager; @@ -65,7 +66,13 @@ public class ImportQueueManager implements QueueManager { @Override - public void sendMessage( final Object body ) throws IOException { + public <T extends Serializable> void sendMessage( final T body ) throws IOException { + + } + + + @Override + public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { }
