merge from 1048
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5eed63d4 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5eed63d4 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5eed63d4 Branch: refs/heads/master Commit: 5eed63d43999c8f381e2fbcbe7d2c1fdb6b73729 Parents: 68befd2 4013f17 Author: Shawn Feldman <[email protected]> Authored: Mon Oct 19 15:19:33 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Mon Oct 19 15:19:33 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 377 ++++++++---- .../asyncevents/AsyncEventService.java | 1 + .../asyncevents/AsyncIndexProvider.java | 14 +- .../asyncevents/model/AsyncEvent.java | 8 +- .../model/ElasticsearchIndexEvent.java | 50 ++ .../index/IndexProcessorFig.java | 17 + .../util/ObjectJsonSerializer.java | 92 +++ .../index/AmazonAsyncEventServiceTest.java | 6 +- .../index/AsyncIndexServiceTest.java | 2 +- .../usergrid/persistence/map/MapManager.java | 25 +- .../persistence/map/impl/MapManagerImpl.java | 6 + .../persistence/map/impl/MapSerialization.java | 27 +- .../map/impl/MapSerializationImpl.java | 265 ++++---- .../index/impl/DeIndexOperation.java | 4 + .../persistence/index/impl/IndexOperation.java | 4 + .../index/impl/IndexOperationMessage.java | 5 + .../persistence/queue/LocalQueueManager.java | 12 +- .../persistence/queue/QueueManager.java | 8 +- .../persistence/queue/guice/QueueModule.java | 1 - .../queue/impl/SNSQueueManagerImpl.java | 615 +++++++++++-------- .../queue/impl/SQSQueueManagerImpl.java | 352 ----------- .../services/queues/ImportQueueManager.java | 9 +- 22 files changed, 1017 insertions(+), 883 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 1ad9b86,2bace8d..aafb63a --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@@ -27,7 -27,7 +27,8 @@@ import org.apache.usergrid.persistence. import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.IndexProducer; +import org.apache.usergrid.persistence.queue.LocalQueueManager; + import org.apache.usergrid.persistence.map.MapManagerFactory; import org.apache.usergrid.persistence.queue.QueueFig; import org.apache.usergrid.persistence.queue.QueueManagerFactory; @@@ -99,16 -103,12 +104,15 @@@ public class AsyncIndexProvider impleme switch (impl) { case LOCAL: - return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously()); + AmazonAsyncEventService eventService = new AmazonAsyncEventService(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig); ++ entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler); + eventService.MAX_TAKE = 1000; + return eventService; case SQS: - return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig ); + throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region"); case SNS: return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig); + entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); default: throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index 8a8c8a5,ec9b315..69d5e18 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@@ -86,7 -86,32 +86,24 @@@ public interface IndexProcessorFig exte String getQueueImplementation(); @Default("1000") - @Key("elasticsearch.reindex.flush.interval") - int getUpdateInterval(); - - @Default("100") - @Key("elasticsearch.buffer.time_ms") - int getBufferTime(); - - @Default("1000") - @Key( REINDEX_BUFFER_SIZE ) + @Key(REINDEX_BUFFER_SIZE) int getReindexBufferSize(); + /** + * Flag to resolve the LOCAL queue implementation service synchronously. + */ + @Default("false") + @Key("elasticsearch.queue_impl.resolution") + boolean resolveSynchronously(); + + /** + * Get the message TTL in milliseconds. Defaults to 24 hours + * + * 24 * 60 * 60 * 1000 + * + * @return + */ + @Default("86400000") + @Key( "elasticsearch.message.ttl" ) + int getIndexMessageTtl(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java index 4b4218a,0000000..8be6099 mode 100644,000000..100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java @@@ -1,98 -1,0 +1,108 @@@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ + +package org.apache.usergrid.persistence.queue; + +import rx.Observable; + +import java.io.IOException; +import java.util.AbstractQueue; ++import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Default queue manager implementation, uses in memory linked queue + */ +public class LocalQueueManager implements QueueManager { + public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000); + + @Override + public Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { + List<QueueMessage> returnQueue = new ArrayList<>(); + try { + QueueMessage message=null; + int count = 5; + do { + message = queue.poll(100, TimeUnit.MILLISECONDS); + if (message != null) { + returnQueue.add(message); + } + }while(message!=null && count-->0); + }catch (InterruptedException ie){ + throw new RuntimeException(ie); + } + return Observable.from( returnQueue); + } + + @Override + public long getQueueDepth() { + return queue.size(); + } + + @Override + public void commitMessage(QueueMessage queueMessage) { + } + + @Override + public void commitMessages(List<QueueMessage> queueMessages) { + } + + @Override + public void sendMessages(List bodies) throws IOException { + for(Object body : bodies){ + String uuid = UUID.randomUUID().toString(); + try { + queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here")); + }catch (InterruptedException ie){ + throw new RuntimeException(ie); + } + } + } + ++ + @Override - public void sendMessage(Object body) throws IOException { ++ public <T extends Serializable> void sendMessage( final T body ) throws IOException { + String uuid = UUID.randomUUID().toString(); + try { + queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here")); + }catch (InterruptedException ie){ + throw new RuntimeException(ie); + } + } + ++ ++ ++ @Override ++ public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { ++ sendMessage( body ); ++ } ++ ++ + @Override + public void deleteQueue() { + + } +}
