Back to Schedulers.io()

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/949f71be
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/949f71be
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/949f71be

Branch: refs/heads/master
Commit: 949f71be2b2f8dd6352d82195a72f8c3f31c32f1
Parents: e76e65d
Author: Michael Russo <mru...@apigee.com>
Authored: Wed Apr 20 19:42:44 2016 -0700
Committer: Michael Russo <mru...@apigee.com>
Committed: Wed Apr 20 19:42:44 2016 -0700

----------------------------------------------------------------------
 .../notifications/impl/ApplicationQueueManagerImpl.java  | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/949f71be/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index fa8c8a2..44ccf2b 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -64,7 +64,7 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
 
 
 
-    private final Scheduler scheduler;
+    //private final Scheduler scheduler;
 
 
 
@@ -79,8 +79,10 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
         this.queueName = getQueueNames(properties);
         this.queueMeter = 
metricsFactory.getMeter(ApplicationQueueManagerImpl.class, 
"notification.queue");
         this.sendMeter = metricsFactory.getMeter(NotificationsService.class, 
"queue.send");
+        this.concurrencyFactor = 
Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50"));
 
 
+        /**
         int maxAsyncThreads;
         int workerQueueSize;
 
@@ -88,7 +90,6 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
 
             maxAsyncThreads = 
Integer.valueOf(System.getProperty(PUSH_PROCESSING_MAXTHREADS_PROP, "200"));
             workerQueueSize = 
Integer.valueOf(System.getProperty(PUSH_PROCESSING_QUEUESIZE_PROP, "2000"));
-            this.concurrencyFactor = 
Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50"));
 
         } catch (Exception e){
 
@@ -104,7 +105,7 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
         this.scheduler = Schedulers.from(TaskExecutorFactory
             .createTaskExecutor( "push-device-io", maxAsyncThreads, 
workerQueueSize,
                 TaskExecutorFactory.RejectionAction.CALLERRUNS ));
-
+         **/
 
     }
 
@@ -308,7 +309,7 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
 
                         })
                         .map(sendMessageFunction)
-                        .subscribeOn(scheduler);
+                        .subscribeOn(Schedulers.io());
 
                 }, concurrencyFactor)
                 .distinct( queueMessage -> {
@@ -374,7 +375,7 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
 
                 });
 
-            processMessagesObservable.subscribeOn(scheduler).subscribe(); // 
fire the queuing into the background
+            
processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the 
queuing into the background
 
         }
 

Reply via email to