Merge branch 'refs/heads/2.1-release' into USERGRID-1048
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3e155852 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3e155852 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3e155852 Branch: refs/heads/2.1-release Commit: 3e1558524728c96834cfd66d9e53e3f1b6a7d3d6 Parents: 04a3f47 a09485a Author: Todd Nine <[email protected]> Authored: Mon Oct 19 13:30:04 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Mon Oct 19 13:30:04 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 149 ++++++++++--------- .../asyncevents/AsyncIndexProvider.java | 26 ++-- .../asyncevents/model/AsyncEvent.java | 14 +- .../asyncevents/model/EdgeDeleteEvent.java | 6 +- .../asyncevents/model/EdgeIndexEvent.java | 9 +- .../asyncevents/model/EntityDeleteEvent.java | 8 +- .../asyncevents/model/EntityIndexEvent.java | 6 +- .../model/InitializeApplicationIndexEvent.java | 4 +- .../index/AmazonAsyncEventServiceTest.java | 6 +- .../cache/CachedEntityCollectionManager.java | 147 ------------------ .../EntityCollectionManagerFactoryImpl.java | 6 - .../usergrid/persistence/queue/QueueFig.java | 2 +- .../queue/impl/SNSQueueManagerImpl.java | 8 +- 13 files changed, 135 insertions(+), 256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index d319ac8,c198674..f8ef5e7 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@@ -97,7 -84,9 +100,8 @@@ public class AmazonAsyncEventService im public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars private final QueueManager queue; - private final QueueScope queueScope; private final IndexProcessorFig indexProcessorFig; + private final QueueFig queueFig; private final IndexProducer indexProducer; private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final IndexLocationStrategyFactory indexLocationStrategyFactory; @@@ -125,33 -113,28 +129,35 @@@ @Inject - public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory, - final IndexProcessorFig indexProcessorFig, - final IndexProducer indexProducer, - final MetricsFactory metricsFactory, - final EntityCollectionManagerFactory entityCollectionManagerFactory, - final IndexLocationStrategyFactory indexLocationStrategyFactory, - final EntityIndexFactory entityIndexFactory, - final EventBuilder eventBuilder, - final RxTaskScheduler rxTaskScheduler, - QueueFig queueFig) { + public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, + final IndexProcessorFig indexProcessorFig, + final IndexProducer indexProducer, + final MetricsFactory metricsFactory, + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final IndexLocationStrategyFactory indexLocationStrategyFactory, + final EntityIndexFactory entityIndexFactory, + final EventBuilder eventBuilder, + final MapManagerFactory mapManagerFactory, ++ final QueueFig queueFig, + final RxTaskScheduler rxTaskScheduler ) { this.indexProducer = indexProducer; this.entityCollectionManagerFactory = entityCollectionManagerFactory; this.indexLocationStrategyFactory = indexLocationStrategyFactory; this.entityIndexFactory = entityIndexFactory; this.eventBuilder = eventBuilder; + + final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents"); + + this.esMapPersistence = mapManagerFactory.createMapManager( mapScope ); + this.rxTaskScheduler = rxTaskScheduler; - this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); + QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); this.queue = queueManagerFactory.getQueueManager(queueScope); + this.indexProcessorFig = indexProcessorFig; + this.queueFig = queueFig; this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write"); this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read"); @@@ -271,70 -259,64 +277,78 @@@ logger.debug("callEventHandlers with {} message", messages.size()); } -- Stream<IndexEventResult> indexEventResults = messages.stream() -- .map(message -> { -- AsyncEvent event = null; -- try { -- event = (AsyncEvent) message.getBody(); -- } catch (ClassCastException cce) { -- logger.error("Failed to deserialize message body", cce); ++ Stream<IndexEventResult> indexEventResults = messages.stream().map( message -> { ++ AsyncEvent event = null; ++ try { ++ event = ( AsyncEvent ) message.getBody(); ++ } ++ catch ( ClassCastException cce ) { ++ logger.error( "Failed to deserialize message body", cce ); ++ } ++ ++ if ( event == null ) { ++ logger.error( "AsyncEvent type or event is null!" ); ++ return new IndexEventResult( Optional.fromNullable( message ), Optional.<IndexOperationMessage>absent(), ++ System.currentTimeMillis() ); ++ } ++ ++ final AsyncEvent thisEvent = event; ++ if ( logger.isDebugEnabled() ) { ++ logger.debug( "Processing {} event", event ); ++ } ++ ++ try { ++ //check for empty sets if this is true ++ boolean validateEmptySets = true; ++ Observable<IndexOperationMessage> indexoperationObservable; ++ //merge each operation to a master observable; ++ if ( event instanceof EdgeDeleteEvent ) { ++ indexoperationObservable = handleEdgeDelete( message ); ++ } ++ else if ( event instanceof EdgeIndexEvent ) { ++ indexoperationObservable = handleEdgeIndex( message ); ++ } ++ else if ( event instanceof EntityDeleteEvent ) { ++ indexoperationObservable = handleEntityDelete( message ); ++ } ++ else if ( event instanceof EntityIndexEvent ) { ++ indexoperationObservable = handleEntityIndexUpdate( message ); ++ } ++ else if ( event instanceof InitializeApplicationIndexEvent ) { ++ //does not return observable ++ handleInitializeApplicationIndex( event, message ); ++ indexoperationObservable = Observable.just( new IndexOperationMessage() ); ++ validateEmptySets = false; //do not check this one for an empty set b/c it will be empty. ++ } ++ else if ( event instanceof ElasticsearchIndexEvent ) { ++ handleIndexOperation( ( ElasticsearchIndexEvent ) event ); ++ indexoperationObservable = Observable.just( new IndexOperationMessage() ); ++ validateEmptySets = false; //do not check this one for an empty set b/c it will be empty. } -- if (event == null) { -- logger.error("AsyncEvent type or event is null!"); -- return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis()); ++ else { ++ throw new Exception( "Unknown EventType" );//TODO: print json instead } -- final AsyncEvent thisEvent = event; -- if (logger.isDebugEnabled()) { -- logger.debug("Processing {} event", event); ++ //collect all of the ++ IndexOperationMessage indexOperationMessage = indexoperationObservable ++ .collect( () -> new IndexOperationMessage(), ( collector, single ) -> collector.ingest( single ) ) ++ .toBlocking().lastOrDefault( null ); ++ ++ if ( validateEmptySets && ( indexOperationMessage == null || indexOperationMessage.isEmpty() ) ) { ++ logger.error( "Received empty index sequence message:({}), body:({}) ", message.getMessageId(), ++ message.getStringBody() ); ++ throw new Exception( "Received empty index sequence." ); } -- try { -- //check for empty sets if this is true -- boolean validateEmptySets = true; -- Observable<IndexOperationMessage> indexoperationObservable; -- //merge each operation to a master observable; -- if (event instanceof EdgeDeleteEvent) { -- indexoperationObservable = handleEdgeDelete(message); -- } else if (event instanceof EdgeIndexEvent) { -- indexoperationObservable = handleEdgeIndex(message); -- } else if (event instanceof EntityDeleteEvent) { -- indexoperationObservable = handleEntityDelete(message); -- } else if (event instanceof EntityIndexEvent) { -- indexoperationObservable = handleEntityIndexUpdate(message); -- } else if (event instanceof InitializeApplicationIndexEvent) { -- //does not return observable -- handleInitializeApplicationIndex(event, message); -- indexoperationObservable = Observable.just(new IndexOperationMessage()); - validateEmptySets = false; //do not check this one for an empty set b/c it will be empty. - } else if (event instanceof ElasticsearchIndexEvent){ - handleIndexOperation( (ElasticsearchIndexEvent)event ); - indexoperationObservable = Observable.just( new IndexOperationMessage() ); -- validateEmptySets = false; //do not check this one for an empty set b/c it will be empty. - } - - else { - } else { -- throw new Exception("Unknown EventType");//TODO: print json instead -- } -- -- //collect all of the -- IndexOperationMessage indexOperationMessage = -- indexoperationObservable -- .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) -- .toBlocking().lastOrDefault(null); -- -- if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) { -- logger.error("Received empty index sequence message:({}), body:({}) ", -- message.getMessageId(), message.getStringBody()); -- throw new Exception("Received empty index sequence."); -- } -- -- //return type that can be indexed and ack'd later -- return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime()); -- } catch (Exception e) { -- logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e); -- return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime()); ++ //return type that can be indexed and ack'd later ++ return new IndexEventResult( Optional.fromNullable( message ), ++ Optional.fromNullable( indexOperationMessage ), thisEvent.getCreationTime() ); ++ } ++ catch ( Exception e ) { ++ logger.error( "Failed to index message: " + message.getMessageId(), message.getStringBody(), e ); ++ return new IndexEventResult( Optional.absent(), Optional.<IndexOperationMessage>absent(), ++ event.getCreationTime()); } }); @@@ -346,8 -328,7 +360,7 @@@ public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) { IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy( applicationScope ); - offerTopic( - new InitializeApplicationIndexEvent( new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) ); - offer(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy))); ++ offerTopic(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy))); } @@@ -413,8 -394,8 +426,8 @@@ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); -- final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap(entity -> eventBuilder.buildNewEdge( -- applicationScope, entity, edge)); ++ final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap( ++ entity -> eventBuilder.buildNewEdge( applicationScope, entity, edge)); return edgeIndexObservable; } @@@ -450,84 -431,9 +463,84 @@@ @Override public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { - offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) ); + offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); } + + /** + * Queue up an indexOperationMessage for multi region execution + * @param indexOperationMessage + */ + public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) { + + final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage ); + + final UUID newMessageId = UUIDGenerator.newTimeUUID(); + + //write to the map in ES + esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() ); + + + + //now queue up the index message + + final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId ); + + //send to the topic so all regions index the batch + + offerTopic( elasticsearchIndexEvent ); + } + + public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){ + Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); + + final UUID messageId = elasticsearchIndexEvent.getIndexBatchId(); + + Preconditions.checkNotNull( messageId, "messageId must not be null" ); + + + //load the entity + + final String message = esMapPersistence.getString( messageId.toString() ); + + String highConsistency = null; + + if(message == null){ + logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" ); + + highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() ); + + } + + //read the value from the string + + final IndexOperationMessage indexOperationMessage; + + //our original local read has it, parse it. + if(message != null){ + indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class ); + } + //we tried to read it at a higher consistency level and it works + else if (highConsistency != null){ + indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class ); + } + + //we couldn't find it, bail + else{ + logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" ); + + throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId ); + } + + + + //now execute it + indexProducer.put(indexOperationMessage).toBlocking().last(); + + } + + + @Override public long getQueueDepth() { return queue.getQueueDepth(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 3865ecb,8b44714..1649046 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@@ -28,7 -27,7 +27,8 @@@ import org.apache.usergrid.persistence. import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.IndexProducer; +import org.apache.usergrid.persistence.map.MapManagerFactory; + import org.apache.usergrid.persistence.queue.QueueFig; import org.apache.usergrid.persistence.queue.QueueManagerFactory; import com.google.inject.Inject; @@@ -52,18 -51,21 +52,24 @@@ public class AsyncIndexProvider impleme private final IndexLocationStrategyFactory indexLocationStrategyFactory; private final EntityIndexFactory entityIndexFactory; private final IndexProducer indexProducer; + private final MapManagerFactory mapManagerFactory; + private final QueueFig queueFig; private AsyncEventService asyncEventService; @Inject - public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory, - final MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler, final - EntityCollectionManagerFactory entityCollectionManagerFactory, - final EventBuilder eventBuilder, final IndexLocationStrategyFactory indexLocationStrategyFactory, - final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer, - final MapManagerFactory mapManagerFactory ) { + public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig, + final QueueManagerFactory queueManagerFactory, + final MetricsFactory metricsFactory, + final RxTaskScheduler rxTaskScheduler, + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final EventBuilder eventBuilder, + final IndexLocationStrategyFactory indexLocationStrategyFactory, + final EntityIndexFactory entityIndexFactory, - final IndexProducer indexProducer, QueueFig queueFig) { ++ final IndexProducer indexProducer, ++ final MapManagerFactory mapManagerFactory, ++ final QueueFig queueFig) { this.indexProcessorFig = indexProcessorFig; this.queueManagerFactory = queueManagerFactory; @@@ -74,7 -76,7 +80,8 @@@ this.indexLocationStrategyFactory = indexLocationStrategyFactory; this.entityIndexFactory = entityIndexFactory; this.indexProducer = indexProducer; + this.mapManagerFactory = mapManagerFactory; + this.queueFig = queueFig; } @@@ -98,11 -100,11 +105,10 @@@ case LOCAL: return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously()); case SQS: -- return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler ); - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig ); ++ throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead"); case SNS: return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler ); - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig); ++ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler ); default: throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java ---------------------------------------------------------------------- diff --cc stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java index e83d6f8,5b921d9..8ee47a2 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java @@@ -89,7 -89,7 +93,7 @@@ public class AmazonAsyncEventServiceTes @Override protected AsyncEventService getAsyncEventService() { - return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler ); - return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig ); ++ return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index a3fa05e,5ab1a4b..58b2a4d --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@@ -226,44 -171,43 +226,44 @@@ public class SNSQueueManagerImpl implem String multiRegion = fig.getRegionList(); - if (logger.isDebugEnabled()) - logger.debug("MultiRegion Setup specified, regions: [{}]", multiRegion); + if ( logger.isDebugEnabled() ) { + logger.debug( "MultiRegion Setup specified, regions: [{}]", multiRegion ); + } - String[] regionNames = multiRegion.split(","); + String[] regionNames = multiRegion.split( "," ); - final Map<String, String> arrQueueArns = new HashMap<>(regionNames.length + 1); - final Map<String, String> topicArns = new HashMap<>(regionNames.length + 1); + final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 ); + final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 ); - arrQueueArns.put( primaryQueueArn, fig.getRegion() ); - topicArns.put( primaryTopicArn, fig.getRegion() ); + arrQueueArns.put(primaryQueueArn, fig.getPrimaryRegion()); + topicArns.put(primaryTopicArn, fig.getPrimaryRegion()); - for (String regionName : regionNames) { + for ( String regionName : regionNames ) { regionName = regionName.trim(); - Regions regions = Regions.fromName(regionName); - Region region = Region.getRegion(regions); + Regions regions = Regions.fromName( regionName ); + Region region = Region.getRegion( regions ); - AmazonSQSClient sqsClient = createSQSClient(region); - AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously + AmazonSQSClient sqsClient = createSQSClient( region ); + AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously // getTopicArn will create the SNS topic if it doesn't exist - String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true); - topicArns.put(topicArn, regionName); + String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true ); + topicArns.put( topicArn, regionName ); // create the SQS queue if it doesn't exist - String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName); - if (queueArn == null) { - queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig); - queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl); + String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName ); + if ( queueArn == null ) { + queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig ); + queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl ); } - arrQueueArns.put(queueArn, regionName); + arrQueueArns.put( queueArn, regionName ); } - logger.debug("Creating Subscriptions..."); + logger.debug( "Creating Subscriptions..." ); - for (Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet()) { + for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) { String queueARN = queueArnEntry.getKey(); String strSqsRegion = queueArnEntry.getValue(); @@@ -650,10 -519,12 +650,10 @@@ /** * Get the region - * - * @return */ private Region getRegion() { - Regions regions = Regions.fromName( fig.getRegion() ); - return Region.getRegion( regions ); + Regions regions = Regions.fromName(fig.getPrimaryRegion()); + return Region.getRegion(regions); }
