Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev 8d965b3fb -> 078666ec1
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java index e55fcc2..6bcc405 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java @@ -20,7 +20,6 @@ package org.apache.usergrid.persistence.index.impl; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -33,19 +32,19 @@ import org.elasticsearch.client.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.usergrid.persistence.core.future.BetterFuture; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.index.IndexFig; import com.codahale.metrics.Counter; -import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; +import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.google.inject.Singleton; import rx.Observable; import rx.Subscriber; -import rx.Subscription; import rx.schedulers.Schedulers; @@ -54,179 +53,110 @@ import rx.schedulers.Schedulers; */ @Singleton public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { - private static final Logger log = LoggerFactory.getLogger(EsIndexBufferConsumerImpl.class); + private static final Logger log = LoggerFactory.getLogger( EsIndexBufferConsumerImpl.class ); private final IndexFig config; private final FailureMonitorImpl failureMonitor; private final Client client; private final Timer flushTimer; - private final Counter indexSizeCounter; private final Counter indexErrorCounter; private final Meter flushMeter; private final Timer produceTimer; - private final BufferQueue bufferQueue; private final IndexFig indexFig; - private final AtomicLong counter = new AtomicLong( ); + private final AtomicLong counter = new AtomicLong(); + + + private final Counter indexSizeCounter; - //the actively running subscription - private List<Subscription> subscriptions; + private final Timer offerTimer; - private Object mutex = new Object(); + private final BufferProducer bufferProducer; + + + private AtomicLong inFlight = new AtomicLong(); - private AtomicLong inFlight = new AtomicLong( ); @Inject - public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider, final MetricsFactory - metricsFactory, final BufferQueue bufferQueue, final IndexFig indexFig ){ + public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider, + final MetricsFactory metricsFactory, final IndexFig indexFig ) { - this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "buffer.flush"); - this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "buffer.meter"); - this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "buffer.size"); - this.indexErrorCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "error.count"); + this.flushTimer = metricsFactory.getTimer( EsIndexBufferConsumerImpl.class, "buffer.flush" ); + this.flushMeter = metricsFactory.getMeter( EsIndexBufferConsumerImpl.class, "buffer.meter" ); + this.indexSizeCounter = metricsFactory.getCounter( EsIndexBufferConsumerImpl.class, "buffer.size" ); + this.indexErrorCounter = metricsFactory.getCounter( EsIndexBufferConsumerImpl.class, "error.count" ); + this.offerTimer = metricsFactory.getTimer( EsIndexBufferConsumerImpl.class, "index.buffer.producer.timer" ); //wire up the gauge of inflight messages - metricsFactory.addGauge( EsIndexBufferConsumerImpl.class, "inflight.meter", new Gauge<Long>() { - @Override - public Long getValue() { - return inFlight.longValue(); - } - } ); - + metricsFactory.addGauge( EsIndexBufferConsumerImpl.class, "inflight.meter", () -> inFlight.longValue() ); this.config = config; - this.failureMonitor = new FailureMonitorImpl(config,provider); + this.failureMonitor = new FailureMonitorImpl( config, provider ); this.client = provider.getClient(); - this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch"); - this.bufferQueue = bufferQueue; + this.produceTimer = + metricsFactory.getTimer( EsIndexBufferConsumerImpl.class, "index.buffer.consumer.messageFetch" ); this.indexFig = indexFig; - subscriptions = new ArrayList<>( indexFig.getWorkerCount() ); + this.bufferProducer = new BufferProducer(); //batch up sets of some size and send them in batch - start(); - } + startSubscription(); + } - /** - * Loop throught and start the workers - */ - public void start() { - final int count = indexFig.getWorkerCount(); - for(int i = 0; i < count; i ++){ - startWorker(); - } + public BetterFuture put( IndexOperationMessage message ) { + Preconditions.checkNotNull( message, "Message cannot be null" ); + indexSizeCounter.inc( message.getDeIndexRequests().size() ); + indexSizeCounter.inc( message.getIndexRequests().size() ); + Timer.Context time = offerTimer.time(); + bufferProducer.send( message ); + time.stop(); + return message.getFuture(); } /** - * Stop the workers + * Start the subscription */ - public void stop() { - synchronized ( mutex ) { - //stop consuming - - for(final Subscription subscription: subscriptions){ - subscription.unsubscribe(); - } - } - } - - - private void startWorker(){ - synchronized ( mutex) { - - Observable<List<IndexIdentifierImpl.IndexOperationMessage>> consumer = Observable.create( - new Observable.OnSubscribe<List<IndexIdentifierImpl.IndexOperationMessage>>() { - @Override - public void call( final Subscriber<? super List<IndexIdentifierImpl.IndexOperationMessage>> subscriber ) { - - //name our thread so it's easy to see - Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() ); - - - List<IndexIdentifierImpl.IndexOperationMessage> drainList = null; - - do { - - Timer.Context timer = produceTimer.time(); + private void startSubscription() { - try { + Observable.create( bufferProducer ) + .buffer( indexFig.getIndexBufferSize(), indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, + Schedulers.newThread() ) + .doOnNext( containerList -> { + if ( containerList.size() == 0 ) { + return; + } - drainList = bufferQueue - .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), - TimeUnit.MILLISECONDS ); + flushMeter.mark( containerList.size() ); + Timer.Context time = flushTimer.time(); - subscriber.onNext( drainList ); + execute( containerList ); - //take since we're in flight - inFlight.addAndGet( drainList.size() ); - - - timer.stop(); - } - //DO NOT add any doOnError* functions to this subscription. We want the producer - //to receive these exceptions and sleep before a retry - catch ( Throwable t ) { - final long sleepTime = config.getFailureRetryTime(); - - log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t ); - - if ( drainList != null ) { - inFlight.addAndGet( -1 * drainList.size() ); - } - - try { - Thread.sleep( sleepTime ); - } - catch ( InterruptedException ie ) { - //swallow - } - - indexErrorCounter.inc(); - } - } - while ( true ); - } - } ).doOnNext( containerList -> { - if ( containerList.size() == 0 ) { - return; - } - - flushMeter.mark(containerList.size()); - Timer.Context time = flushTimer.time(); - - - execute(containerList); - - time.stop(); - } ) + time.stop(); + } ) //ack after we process - .doOnNext( indexOperationMessages -> { - bufferQueue.ack( indexOperationMessages ); - //release so we know we've done processing - inFlight.addAndGet( -1 * indexOperationMessages.size() ); - } ).subscribeOn( Schedulers.newThread() ); + .doOnNext( indexOperationMessages -> { - //start in the background + //release so we know we've done processing + inFlight.addAndGet( -1 * indexOperationMessages.size() ); + } ).subscribe(); - final Subscription subscription = consumer.subscribe(); + //start in the background - subscriptions.add(subscription ); - } } /** * Execute the request, check for errors, then re-init the batch for future use */ - private void execute( final List<IndexIdentifierImpl.IndexOperationMessage> operationMessages ) { + private void execute( final List<IndexOperationMessage> operationMessages ) { if ( operationMessages == null || operationMessages.size() == 0 ) { return; @@ -250,28 +180,28 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) ).toBlocking().lastOrDefault( null ); //call back all futures - Observable.from( operationMessages ).doOnNext( operationMessage -> operationMessage.getFuture().done() ).toBlocking().lastOrDefault( null ); + Observable.from( operationMessages ).doOnNext( operationMessage -> operationMessage.getFuture().done() ) + .toBlocking().lastOrDefault( null ); } /** * initialize request - * @return */ private BulkRequestBuilder initRequest() { BulkRequestBuilder bulkRequest = client.prepareBulk(); - bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel())); - bulkRequest.setRefresh(config.isForcedRefresh()); + bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) ); + bulkRequest.setRefresh( config.isForcedRefresh() ); return bulkRequest; } + /** * send bulk request - * @param bulkRequest */ - private void sendRequest(BulkRequestBuilder bulkRequest) { + private void sendRequest( BulkRequestBuilder bulkRequest ) { //nothing to do, we haven't added anything to the index - if (bulkRequest.numberOfActions() == 0) { + if ( bulkRequest.numberOfActions() == 0 ) { return; } @@ -280,9 +210,10 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { try { responses = bulkRequest.execute().actionGet(); - } catch (Throwable t) { - log.error("Unable to communicate with elasticsearch"); - failureMonitor.fail("Unable to execute batch", t); + } + catch ( Throwable t ) { + log.error( "Unable to communicate with elasticsearch" ); + failureMonitor.fail( "Unable to execute batch", t ); throw t; } @@ -290,23 +221,43 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { boolean error = false; - for (BulkItemResponse response : responses) { + for ( BulkItemResponse response : responses ) { - if (response.isFailed()) { + if ( response.isFailed() ) { // log error and continue processing - log.error("Unable to index id={}, type={}, index={}, failureMessage={} ", - response.getId(), - response.getType(), - response.getIndex(), - response.getFailureMessage() - ); + log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(), + response.getType(), response.getIndex(), response.getFailureMessage() ); error = true; } } if ( error ) { - throw new RuntimeException("Error during processing of bulk index operations one of the responses failed. Check previous log entries"); + throw new RuntimeException( + "Error during processing of bulk index operations one of the responses failed. Check previous log " + + "entries" ); + } + } + + + public static class BufferProducer implements Observable.OnSubscribe<IndexOperationMessage> { + + private Subscriber<? super IndexOperationMessage> subscriber; + + + /** + * Send the data through the buffer + */ + public void send( final IndexOperationMessage indexOp ) { + + subscriber.onNext( indexOp ); + } + + + @Override + public void call( final Subscriber<? super IndexOperationMessage> subscriber ) { + //just assigns for later use, doesn't do anything else + this.subscriber = subscriber; } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java deleted file mode 100644 index 9ba92d0..0000000 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. The ASF licenses this file to You - * * under the Apache License, Version 2.0 (the "License"); you may not - * * use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. For additional information regarding - * * copyright in this work, please see the NOTICE file in the top level - * * directory of this distribution. - * - */ -package org.apache.usergrid.persistence.index.impl; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Timer; -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import org.apache.usergrid.persistence.core.future.BetterFuture; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; - - -/** - * Producer for index operation messages - */ -@Singleton -public class EsIndexBufferProducerImpl implements IndexBufferProducer { - - private final Counter indexSizeCounter; - - private final Timer timer; - private final BufferQueue bufferQueue; - - @Inject - public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, final BufferQueue bufferQueue ){ - this.bufferQueue = bufferQueue; - this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size"); - this.timer = metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer"); - } - - public BetterFuture put(IndexIdentifierImpl.IndexOperationMessage message){ - Preconditions.checkNotNull(message, "Message cannot be null"); - indexSizeCounter.inc(message.getDeIndexRequests().size()); - indexSizeCounter.inc(message.getIndexRequests().size()); - Timer.Context time = timer.time(); - bufferQueue.offer( message ); - time.stop(); - return message.getFuture(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java index fe31d35..fed9a50 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java @@ -98,30 +98,4 @@ public class FailureMonitorImpl implements FailureMonitor { public void success() { failCounter.set( 0 ); } - - - /** - * Identifier for where an index is in underlying server - */ - public static interface IndexIdentifier { - - /** - * get the alias name - * @return - */ - IndexAlias getAlias(); - - /** - * get index name from suffix - * @param suffix - * @return - */ - String getIndex( String suffix ); - - /** - * return unique string - * @return - */ - String toString(); - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FlushBufferQueue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FlushBufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FlushBufferQueue.java new file mode 100644 index 0000000..4b84db7 --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FlushBufferQueue.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.index.impl; + + +public class FlushBufferQueue {} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java index b8e733d..3258444 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java @@ -19,6 +19,10 @@ */ package org.apache.usergrid.persistence.index.impl; + +import org.apache.usergrid.persistence.core.future.BetterFuture; + + /** * Buffer index requests */ @@ -26,12 +30,9 @@ public interface IndexBufferConsumer { /** - * Start the consumer - */ - void start(); - - /** - * Stop the consumers + * Put this operation into our collapsing bufer + * @param message + * @return */ - void stop(); + BetterFuture put(IndexOperationMessage message); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferProducer.java deleted file mode 100644 index 36cb180..0000000 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferProducer.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. The ASF licenses this file to You - * * under the Apache License, Version 2.0 (the "License"); you may not - * * use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. For additional information regarding - * * copyright in this work, please see the NOTICE file in the top level - * * directory of this distribution. - * - */ -package org.apache.usergrid.persistence.index.impl; - -import org.apache.usergrid.persistence.core.future.BetterFuture; - - -/** - * Produce to index buffer consumer - */ -public interface IndexBufferProducer { - - BetterFuture put(IndexIdentifierImpl.IndexOperationMessage message); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifier.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifier.java new file mode 100644 index 0000000..332c7b7 --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifier.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.index.impl; + + +/** + * Identifier for where an index is in underlying server + */ +public interface IndexIdentifier { + + /** + * get the alias name + * @return + */ + IndexAlias getAlias(); + + /** + * get index name from suffix + * @param suffix + * @return + */ + String getIndex( String suffix ); + + /** + * return unique string + * @return + */ + String toString(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierImpl.java index c47b2c8..782625b 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierImpl.java @@ -20,22 +20,15 @@ package org.apache.usergrid.persistence.index.impl; -import java.io.Serializable; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.Callable; - -import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.inject.Inject; -import org.apache.usergrid.persistence.core.future.BetterFuture; import org.apache.usergrid.persistence.index.IndexFig; /** * Class is used to generate an index name and alias name */ -public class IndexIdentifierImpl implements FailureMonitorImpl.IndexIdentifier { +public class IndexIdentifierImpl implements IndexIdentifier { private final IndexFig config; @Inject @@ -71,113 +64,4 @@ public class IndexIdentifierImpl implements FailureMonitorImpl.IndexIdentifier { public String toString() { return "index id"+config.getIndexPrefix(); } - - - /** - * Container for index operations. - */ - public static class IndexOperationMessage implements Serializable { - private final Set<IndexRequest> indexRequests; - private final Set<DeIndexRequest> deIndexRequests; - - - - private final BetterFuture<IndexOperationMessage> containerFuture; - - - public IndexOperationMessage() { - final IndexOperationMessage parent = this; - this.indexRequests = new HashSet<>(); - this.deIndexRequests = new HashSet<>(); - this.containerFuture = new BetterFuture<>( new Callable<IndexOperationMessage>() { - @Override - public IndexOperationMessage call() throws Exception { - return parent; - } - } ); - } - - - public void addIndexRequest( final IndexRequest indexRequest ) { - indexRequests.add( indexRequest ); - } - - - public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) { - this.indexRequests.addAll( indexRequests ); - } - - - public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) { - this.deIndexRequests.add( deIndexRequest ); - } - - - public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) { - this.deIndexRequests.addAll( deIndexRequests ); - } - - - public Set<IndexRequest> getIndexRequests() { - return indexRequests; - } - - - public Set<DeIndexRequest> getDeIndexRequests() { - return deIndexRequests; - } - - - @JsonIgnore - public boolean isEmpty(){ - return indexRequests.isEmpty() && deIndexRequests.isEmpty(); - } - - /** - * return the promise - */ - @JsonIgnore - public BetterFuture<IndexOperationMessage> getFuture() { - return containerFuture; - } - - - @Override - public boolean equals( final Object o ) { - if ( this == o ) { - return true; - } - if ( o == null || getClass() != o.getClass() ) { - return false; - } - - final IndexOperationMessage that = ( IndexOperationMessage ) o; - - if ( !deIndexRequests.equals( that.deIndexRequests ) ) { - return false; - } - if ( !indexRequests.equals( that.indexRequests ) ) { - return false; - } - - return true; - } - - - @Override - public int hashCode() { - int result = indexRequests.hashCode(); - result = 31 * result + deIndexRequests.hashCode(); - return result; - } - - public void done() { - //if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack - final BetterFuture<IndexOperationMessage> future = getFuture(); - - if(future != null ){ - future.done(); - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java new file mode 100644 index 0000000..1a60026 --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.index.impl; + + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Callable; + +import org.apache.usergrid.persistence.core.future.BetterFuture; + +import com.fasterxml.jackson.annotation.JsonIgnore; + + +/** + * Container for index operations. + */ +public class IndexOperationMessage implements Serializable { + private final Set<IndexRequest> indexRequests; + private final Set<DeIndexRequest> deIndexRequests; + + + + private final BetterFuture<IndexOperationMessage> containerFuture; + + + public IndexOperationMessage() { + final IndexOperationMessage parent = this; + this.indexRequests = new HashSet<>(); + this.deIndexRequests = new HashSet<>(); + this.containerFuture = new BetterFuture<>( new Callable<IndexOperationMessage>() { + @Override + public IndexOperationMessage call() throws Exception { + return parent; + } + } ); + } + + + public void addIndexRequest( final IndexRequest indexRequest ) { + indexRequests.add( indexRequest ); + } + + + public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) { + this.indexRequests.addAll( indexRequests ); + } + + + public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) { + this.deIndexRequests.add( deIndexRequest ); + } + + + public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) { + this.deIndexRequests.addAll( deIndexRequests ); + } + + + public Set<IndexRequest> getIndexRequests() { + return indexRequests; + } + + + public Set<DeIndexRequest> getDeIndexRequests() { + return deIndexRequests; + } + + + @JsonIgnore + public boolean isEmpty(){ + return indexRequests.isEmpty() && deIndexRequests.isEmpty(); + } + + /** + * return the promise + */ + @JsonIgnore + public BetterFuture<IndexOperationMessage> getFuture() { + return containerFuture; + } + + + @Override + public boolean equals( final Object o ) { + if ( this == o ) { + return true; + } + if ( o == null || getClass() != o.getClass() ) { + return false; + } + + final IndexOperationMessage that = ( IndexOperationMessage ) o; + + if ( !deIndexRequests.equals( that.deIndexRequests ) ) { + return false; + } + if ( !indexRequests.equals( that.indexRequests ) ) { + return false; + } + + return true; + } + + + @Override + public int hashCode() { + int result = indexRequests.hashCode(); + result = 31 * result + deIndexRequests.hashCode(); + return result; + } + + public void done() { + //if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack + final BetterFuture<IndexOperationMessage> future = getFuture(); + + if(future != null ){ + future.done(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java index f3663cd..5052ddf 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java @@ -61,14 +61,14 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { private final IndexAlias alias; private final IndexCache indexCache; private final EsProvider esProvider; - private final IndexBufferProducer producer; + private final IndexBufferConsumer producer; private final IndexFig indexFig; private final Timer timer; @Inject - public IndexRefreshCommandImpl( FailureMonitorImpl.IndexIdentifier indexIdentifier, EsProvider esProvider, - IndexBufferProducer producer, IndexFig indexFig, MetricsFactory metricsFactory, + public IndexRefreshCommandImpl( IndexIdentifier indexIdentifier, EsProvider esProvider, + IndexBufferConsumer producer, IndexFig indexFig, MetricsFactory metricsFactory, final IndexCache indexCache ) { @@ -105,7 +105,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { IndexRequest indexRequest = new IndexRequest( alias.getWriteAlias(), docId, entityData ); //save the item - IndexIdentifierImpl.IndexOperationMessage message = new IndexIdentifierImpl.IndexOperationMessage(); + IndexOperationMessage message = new IndexOperationMessage(); message.addIndexRequest( indexRequest ); producer.put( message ); @@ -153,8 +153,8 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { new DeIndexRequest( aliases, appScope, edge, entity.getId(), entity.getVersion() ); //delete the item - IndexIdentifierImpl.IndexOperationMessage indexOperationMessage = - new IndexIdentifierImpl.IndexOperationMessage(); + IndexOperationMessage indexOperationMessage = + new IndexOperationMessage(); indexOperationMessage.addDeIndexRequest( deIndexRequest ); producer.put( indexOperationMessage ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/LegacyIndexIdentifier.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/LegacyIndexIdentifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/LegacyIndexIdentifier.java index c60de42..c93fd86 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/LegacyIndexIdentifier.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/LegacyIndexIdentifier.java @@ -20,15 +20,15 @@ package org.apache.usergrid.persistence.index.migration; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.index.impl.FailureMonitorImpl; import org.apache.usergrid.persistence.index.impl.IndexAlias; import org.apache.usergrid.persistence.index.IndexFig; +import org.apache.usergrid.persistence.index.impl.IndexIdentifier; import org.apache.usergrid.persistence.index.impl.IndexingUtils; /** * Class is used to generate an index name and alias name the old way via app name */ -public class LegacyIndexIdentifier implements FailureMonitorImpl.IndexIdentifier { +public class LegacyIndexIdentifier implements IndexIdentifier { private final IndexFig config; private final ApplicationScope applicationScope; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java index 20daf82..410f0e3 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java @@ -19,27 +19,17 @@ package org.apache.usergrid.persistence.index.guice; -import com.google.inject.Inject; -import com.google.inject.TypeLiteral; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.index.EntityIndex; -import org.apache.usergrid.persistence.index.impl.IndexBufferProducer; -import org.apache.usergrid.persistence.index.IndexFig; -import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl; -import org.apache.usergrid.persistence.index.impl.IndexCache; -import org.apache.usergrid.persistence.index.impl.EsProvider; -import org.apache.usergrid.persistence.index.migration.LegacyIndexIdentifier; -import org.apache.usergrid.persistence.model.entity.SimpleId; import org.safehaus.guicyfig.GuicyFigModule; import org.apache.usergrid.persistence.core.guice.CommonModule; import org.apache.usergrid.persistence.core.guice.TestModule; -import rx.Observable; +import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import java.util.UUID; +import com.google.inject.Inject; +import com.google.inject.TypeLiteral; + +import rx.Observable; public class TestIndexModule extends TestModule { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java deleted file mode 100644 index 43e581a..0000000 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.index.impl; - - -import java.util.*; -import java.util.concurrent.TimeUnit; - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.SearchType; -import org.apache.usergrid.persistence.model.entity.SimpleId; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.index.IndexFig; -import org.apache.usergrid.persistence.index.guice.TestIndexModule; -import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.queue.NoAWSCredsRule; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider; - -import com.google.inject.Inject; - -import net.jcip.annotations.NotThreadSafe; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; - - -@RunWith(EsRunner.class) -@UseModules({ TestIndexModule.class }) -@NotThreadSafe -public class BufferQueueSQSImplTest { - - - @Inject - @Rule - public MigrationManagerRule migrationManagerRule; - - - @Rule - public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule(); - - @Inject - public QueueManagerFactory queueManagerFactory; - - @Inject - public IndexFig indexFig; - - @Inject - public MapManagerFactory mapManagerFactory; - - @Inject - public MetricsFactory metricsFactory; - - - private BufferQueueSQSImpl bufferQueueSQS; - - @Before - public void setup(){ - bufferQueueSQS = new BufferQueueSQSImpl( queueManagerFactory, indexFig, mapManagerFactory, metricsFactory ); - } - - - - - @Test - public void testMessageIndexing(){ - - ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(UUID.randomUUID(),"application")); - final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - assumeTrue( ugProvider.getCredentials().getAWSAccessKeyId() != null ); - assumeTrue( ugProvider.getCredentials().getAWSSecretKey() != null ); - - final Map<String, Object> request1Data = new HashMap<String, Object>() {{put("test", "testval1");}}; - final IndexRequest indexRequest1 = new IndexRequest( "testAlias1", "testDoc1",request1Data ); - - - final Map<String, Object> request2Data = new HashMap<String, Object>() {{put("test", "testval2");}}; - final IndexRequest indexRequest2 = new IndexRequest( "testAlias2", "testDoc2",request2Data ); - - - //de-index request - final DeIndexRequest deIndexRequest1 = new DeIndexRequest( new String[]{"index1.1, index1.2"}, applicationScope, new SearchEdgeImpl(new SimpleId("testId3"),"name3", - - - SearchEdge.NodeType.SOURCE ), new SimpleId("id3"), UUID.randomUUID() ); - - final DeIndexRequest deIndexRequest2 = new DeIndexRequest( new String[]{"index2.1", "index2.1"}, applicationScope, new SearchEdgeImpl(new SimpleId("testId4"),"name4", - SearchEdge.NodeType.SOURCE ), new SimpleId("id4"), UUID.randomUUID() ); - - - - - IndexIdentifierImpl.IndexOperationMessage indexOperationMessage = new IndexIdentifierImpl.IndexOperationMessage(); - indexOperationMessage.addIndexRequest( indexRequest1); - indexOperationMessage.addIndexRequest( indexRequest2); - - indexOperationMessage.addDeIndexRequest( deIndexRequest1 ); - indexOperationMessage.addDeIndexRequest( deIndexRequest2 ); - - bufferQueueSQS.offer( indexOperationMessage ); - - //wait for it to send to SQS - indexOperationMessage.getFuture().get(); - - //now get it back - - final List<IndexIdentifierImpl.IndexOperationMessage> ops = getResults( 20, TimeUnit.SECONDS ); - - assertTrue(ops.size() > 0); - - final IndexIdentifierImpl.IndexOperationMessage returnedOperation = ops.get( 0 ); - - //get the operations out - - final Set<IndexRequest> indexRequestSet = returnedOperation.getIndexRequests(); - - assertTrue(indexRequestSet.contains(indexRequest1)); - assertTrue(indexRequestSet.contains(indexRequest2)); - - - final Set<DeIndexRequest> deIndexRequests = returnedOperation.getDeIndexRequests(); - - assertTrue( deIndexRequests.contains( deIndexRequest1 ) ); - assertTrue( deIndexRequests.contains( deIndexRequest2 ) ); - - - - //now ack the message - - bufferQueueSQS.ack( ops ); - - } - - private List<IndexIdentifierImpl.IndexOperationMessage> getResults(final long timeout, final TimeUnit timeUnit){ - final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout ); - - List<IndexIdentifierImpl.IndexOperationMessage> ops; - - do{ - ops = bufferQueueSQS.take( 10, 20, TimeUnit.SECONDS ); - }while((ops == null || ops.size() == 0 ) && System.currentTimeMillis() < endTime); - - return ops; - } - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java deleted file mode 100644 index ba0dc1f..0000000 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.queue; - - -import org.junit.Assume; -import org.junit.internal.runners.model.MultipleFailureException; -import org.junit.rules.TestRule; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; - -import com.amazonaws.AmazonClientException; - - -/** - * Created in an attempt to mark no aws cred tests as ignored. Blocked by this issue - * https://github.com/junit-team/junit/issues/116 - * - * Until then, simply marks as passed, which is a bit dangerous - */ -public class NoAWSCredsRule implements TestRule { - - public Statement apply( final Statement base, final Description description ) { - return new Statement() { - @Override - public void evaluate() throws Throwable { - - try { - base.evaluate(); - } - catch ( Throwable t ) { - - if ( !isMissingCredsException( t ) ) { - throw t; - } - - //do this so our test gets marked as ignored. Not pretty, but it works - Assume.assumeTrue( false ); - - - } - } - }; - } - - - private boolean isMissingCredsException( final Throwable t ) { - - if ( t instanceof AmazonClientException ) { - - final AmazonClientException ace = ( AmazonClientException ) t; - - if ( ace.getMessage().contains( "could not get aws access key" ) || ace.getMessage().contains( - "could not get aws secret key from system properties" ) ) { - //swallow - return true; - } - } - - /** - * Handle the multiple failure junit trace - */ - if( t instanceof MultipleFailureException ){ - for(final Throwable failure : ((MultipleFailureException)t).getFailures()){ - final boolean isMissingCreds = isMissingCredsException( failure ); - - if(isMissingCreds){ - return true; - } - } - } - final Throwable cause = t.getCause(); - - if ( cause == null ) { - return false; - } - - - return isMissingCredsException( cause ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java index eecb9e1..452d328 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java @@ -27,16 +27,15 @@ import java.util.List; import java.util.Map; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; +import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; import org.apache.usergrid.persistence.core.test.ITRunner; import org.apache.usergrid.persistence.core.test.UseModules; import org.apache.usergrid.persistence.queue.guice.TestQueueModule; import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; -import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider; import com.google.inject.Inject; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/services/pom.xml ---------------------------------------------------------------------- diff --git a/stack/services/pom.xml b/stack/services/pom.xml index 6074e65..2f1ccdf 100644 --- a/stack/services/pom.xml +++ b/stack/services/pom.xml @@ -284,42 +284,42 @@ <artifactId>slf4j-api</artifactId> </dependency> - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>usergrid-core</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <classifier>tests</classifier> - </dependency> + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>usergrid-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <classifier>tests</classifier> + </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> - <artifactId>queue</artifactId> + <artifactId>common</artifactId> <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>usergrid-config</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <classifier>tests</classifier> - </dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>usergrid-config</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <classifier>tests</classifier> + </dependency> - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>usergrid-test-utils</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>usergrid-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java ---------------------------------------------------------------------- diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java index e6970b5..fbf8290 100644 --- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java +++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java @@ -16,33 +16,28 @@ */ package org.apache.usergrid.services.notifications; -import org.apache.commons.io.IOUtils; -import org.apache.usergrid.persistence.queue.NoAWSCredsRule; -import org.apache.usergrid.services.notifications.apns.MockSuccessfulProviderAdapter; -import org.apache.usergrid.persistence.entities.Notifier; +import java.io.InputStream; +import java.lang.reflect.Field; import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; -import org.apache.usergrid.services.notifications.ConnectionException; -import org.apache.usergrid.services.notifications.NotificationsService; -import java.io.InputStream; -import java.lang.reflect.Field; -import java.net.SocketException; +import org.apache.commons.io.IOUtils; + import org.apache.usergrid.persistence.Schema; +import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; +import org.apache.usergrid.persistence.entities.Notifier; import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; import org.apache.usergrid.services.AbstractServiceIT; import org.apache.usergrid.services.ServiceAction; import org.apache.usergrid.setup.ConcurrentProcessSingleton; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertArrayEquals; - +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; public class NotifiersServiceIT extends AbstractServiceIT {
