http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java deleted file mode 100644 index 625a8fd..0000000 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.impl.IndexProducer; -import org.apache.usergrid.persistence.queue.QueueFig; -import org.junit.Rule; -import org.junit.runner.RunWith; - -import org.apache.usergrid.corepersistence.TestIndexModule; -import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.asyncevents.AmazonAsyncEventService; -import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; -import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.index.impl.EsRunner; -import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; - -import com.google.inject.Inject; - -import net.jcip.annotations.NotThreadSafe; - -import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - - -@RunWith( EsRunner.class ) -@UseModules( { TestIndexModule.class } ) -@NotThreadSafe -public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest { - - - - @Rule - public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule(); - - - - @Inject - public QueueManagerFactory queueManagerFactory; - - @Inject - public IndexProcessorFig indexProcessorFig; - - @Inject - public QueueFig queueFig; - - - @Inject - public MetricsFactory metricsFactory; - - @Inject - public RxTaskScheduler rxTaskScheduler; - - @Inject - public EventBuilder eventBuilder; - - @Inject - public IndexProducer indexProducer; - - @Inject - public IndexLocationStrategyFactory indexLocationStrategyFactory; - - @Inject - public MapManagerFactory mapManagerFactory; - - - @Inject - public EntityIndexFactory entityIndexFactory; - - @Override - protected AsyncEventService getAsyncEventService() { - return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); - } - - - - - -}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java new file mode 100644 index 0000000..c915464 --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.impl.IndexProducer; +import org.apache.usergrid.persistence.queue.QueueFig; +import org.junit.Rule; +import org.junit.runner.RunWith; + +import org.apache.usergrid.corepersistence.TestIndexModule; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl; +import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.index.impl.EsRunner; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; + +import com.google.inject.Inject; + +import net.jcip.annotations.NotThreadSafe; + +import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + + +@RunWith( EsRunner.class ) +@UseModules( { TestIndexModule.class } ) +@NotThreadSafe +public class AsyncEventServiceImplTest extends AsyncIndexServiceTest { + + + + @Rule + public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule(); + + + + @Inject + public QueueManagerFactory queueManagerFactory; + + @Inject + public IndexProcessorFig indexProcessorFig; + + @Inject + public QueueFig queueFig; + + + @Inject + public MetricsFactory metricsFactory; + + @Inject + public RxTaskScheduler rxTaskScheduler; + + @Inject + public EventBuilder eventBuilder; + + @Inject + public IndexProducer indexProducer; + + @Inject + public IndexLocationStrategyFactory indexLocationStrategyFactory; + + @Inject + public MapManagerFactory mapManagerFactory; + + + @Inject + public EntityIndexFactory entityIndexFactory; + + @Override + protected AsyncEventService getAsyncEventService() { + return new AsyncEventServiceImpl( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); + } + + + + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java index 74f9ce0..12a33cf 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java @@ -55,7 +55,6 @@ import com.google.inject.Inject; import net.jcip.annotations.NotThreadSafe; import rx.Observable; -import rx.schedulers.Schedulers; import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; import static org.junit.Assert.assertEquals; @@ -145,7 +144,7 @@ public abstract class AsyncIndexServiceTest { //queue up processing - asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity ); + asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity, 0); final EntityIndex EntityIndex = http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java index 4782bea..62102b4 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java @@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.index.impl; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.codahale.metrics.Histogram; @@ -130,7 +131,9 @@ public class EsIndexProducerImpl implements IndexProducer { final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex); //buffer into the max size we can send ES and fire them all off until we're completed - final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize()) + final Observable<BulkRequestBuilder> requests = batchOps + .buffer(indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS) + //flatten the buffer into a single batch execution .flatMap(individualOps -> Observable.from(individualOps) //collect them http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java index cdab3e0..88ad3ff 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java @@ -82,4 +82,8 @@ public interface QueueFig extends GuicyFig { @Key( "usergrid.queue.visibilityTimeout" ) @Default("5000") // 5 seconds int getVisibilityTimeout(); + + @Key( "usergrid.queue.localquorum.timeout") + @Default("3000") // 3 seconds + int getLocalQuorumTimeout(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java index c439b49..0507818 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java @@ -292,7 +292,10 @@ public class ServiceManager { } } catch ( ClassNotFoundException e1 ) { - logger.error("Could not load class", e1); + if(logger.isTraceEnabled()){ + logger.trace("Could not find class", e1); + } + } return null; }
