STREAMS-280 | Adding a document count in the Sysomos Provider
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bee7c9db Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bee7c9db Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bee7c9db Branch: refs/heads/asf-master Commit: bee7c9db2ff1db7e464fafec53f6271e5ad7c60d Parents: 90a96df Author: Robert Douglas <rdoug...@w2ogroup.com> Authored: Wed Feb 11 11:42:57 2015 -0600 Committer: Robert Douglas <rdoug...@w2ogroup.com> Committed: Wed Feb 11 11:42:57 2015 -0600 ---------------------------------------------------------------------- .../org/apache/streams/sysomos/provider/SysomosProvider.java | 8 ++++++++ 1 file changed, 8 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bee7c9db/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java index 07e8c3c..0d8ef9e 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -83,6 +84,7 @@ public class SysomosProvider implements StreamsProvider { private Map<String, String> addedAfter; private Mode mode = Mode.CONTINUOUS; private boolean started = false; + private AtomicInteger count; public SysomosProvider(SysomosConfiguration sysomosConfiguration) { this.config = sysomosConfiguration; @@ -91,6 +93,7 @@ public class SysomosProvider implements StreamsProvider { this.minLatency = sysomosConfiguration.getMinDelayMs() == null ? LATENCY : sysomosConfiguration.getMinDelayMs(); this.scheduledLatency = sysomosConfiguration.getScheduledDelayMs() == null ? (LATENCY * 15) : sysomosConfiguration.getScheduledDelayMs(); this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize(); + this.count = new AtomicInteger(); } public SysomosConfiguration getConfig() { @@ -138,6 +141,7 @@ public class SysomosProvider implements StreamsProvider { try { lock.writeLock().lock(); LOGGER.debug("Creating new result set for {} items", providerQueue.size()); + count.addAndGet(providerQueue.size()); current = new StreamsResultSet(providerQueue); providerQueue = constructQueue(); } finally { @@ -289,4 +293,8 @@ public class SysomosProvider implements StreamsProvider { private Queue<StreamsDatum> constructQueue() { return Queues.newConcurrentLinkedQueue(); } + + public int getCount() { + return this.count.get(); + } }