STREAMS-280 | Adding a document count in the Sysomos Provider

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bee7c9db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bee7c9db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bee7c9db

Branch: refs/heads/asf-master
Commit: bee7c9db2ff1db7e464fafec53f6271e5ad7c60d
Parents: 90a96df
Author: Robert Douglas <rdoug...@w2ogroup.com>
Authored: Wed Feb 11 11:42:57 2015 -0600
Committer: Robert Douglas <rdoug...@w2ogroup.com>
Committed: Wed Feb 11 11:42:57 2015 -0600

----------------------------------------------------------------------
 .../org/apache/streams/sysomos/provider/SysomosProvider.java | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bee7c9db/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index 07e8c3c..0d8ef9e 100644
--- 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -83,6 +84,7 @@ public class SysomosProvider implements StreamsProvider {
     private Map<String, String> addedAfter;
     private Mode mode = Mode.CONTINUOUS;
     private boolean started = false;
+    private AtomicInteger count;
 
     public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
         this.config = sysomosConfiguration;
@@ -91,6 +93,7 @@ public class SysomosProvider implements StreamsProvider {
         this.minLatency = sysomosConfiguration.getMinDelayMs() == null ? 
LATENCY : sysomosConfiguration.getMinDelayMs();
         this.scheduledLatency = sysomosConfiguration.getScheduledDelayMs() == 
null ? (LATENCY * 15) : sysomosConfiguration.getScheduledDelayMs();
         this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? 
API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize();
+        this.count = new AtomicInteger();
     }
 
     public SysomosConfiguration getConfig() {
@@ -138,6 +141,7 @@ public class SysomosProvider implements StreamsProvider {
         try {
             lock.writeLock().lock();
             LOGGER.debug("Creating new result set for {} items", 
providerQueue.size());
+            count.addAndGet(providerQueue.size());
             current = new StreamsResultSet(providerQueue);
             providerQueue = constructQueue();
         } finally {
@@ -289,4 +293,8 @@ public class SysomosProvider implements StreamsProvider {
     private Queue<StreamsDatum> constructQueue() {
         return Queues.newConcurrentLinkedQueue();
     }
+
+    public int getCount() {
+        return this.count.get();
+    }
 }

Reply via email to