simplify/isolate push provider
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2e66e51a Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2e66e51a Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2e66e51a Branch: refs/heads/master Commit: 2e66e51a9fbef1e1fb7ae922a37d94d38dcf254a Parents: f272ff5 Author: Steve Blackmon <sblack...@w2odigital.com> Authored: Wed Jul 30 21:22:22 2014 -0500 Committer: Steve Blackmon <sblack...@w2odigital.com> Committed: Wed Jul 30 21:26:33 2014 -0500 ---------------------------------------------------------------------- .../datasift/provider/DatasiftPushProvider.java | 61 +++++++------------- 1 file changed, 20 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2e66e51a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java index 196f504..264dbbe 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java @@ -18,28 +18,22 @@ under the License. */ package org.apache.streams.datasift.provider; -import com.datasift.client.DataSiftClient; import com.datasift.client.stream.DeletedInteraction; -import com.datasift.client.stream.Interaction; import com.datasift.client.stream.StreamEventListener; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import com.google.common.collect.Queues; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.datasift.DatasiftConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigInteger; -import java.util.Map; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Requires Java Version 1.7! @@ -51,27 +45,17 @@ public class DatasiftPushProvider implements StreamsProvider { private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class); private DatasiftConfiguration config; - protected ConcurrentLinkedQueue<Interaction> interactions = new ConcurrentLinkedQueue<Interaction>(); - private Map<String, DataSiftClient> clients; - private StreamEventListener eventListener; - private ObjectMapper mapper; + protected Queue<StreamsDatum> providerQueue; - public DatasiftPushProvider() { + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - } + public DatasiftPushProvider() { - // to set up a webhook we need to be able to return a reference to this queue - public Queue<Interaction> getInteractions() { - return interactions; } @Override public void startStream() { - - Preconditions.checkNotNull(this.config); - Preconditions.checkNotNull(this.config.getApiKey()); - Preconditions.checkNotNull(this.config.getUserName()); - + Preconditions.checkNotNull(providerQueue); } /** @@ -85,23 +69,17 @@ public class DatasiftPushProvider implements StreamsProvider { //This is a hack. It is only like this because of how perpetual streams work at the moment. Read list server to debate/vote for new interfaces. public StreamsResultSet readCurrent() { Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue(); - StreamsDatum datum = null; - Interaction interaction; - while (!this.interactions.isEmpty()) { - interaction = this.interactions.poll(); - try { - datum = new StreamsDatum(this.mapper.writeValueAsString(interaction.getData()), interaction.getData().get("interaction").get("id").textValue()); - } catch (JsonProcessingException jpe) { - LOGGER.error("Exception while converting Interaction to String : {}", jpe); - } - if (datum != null) { - while (!datums.offer(datum)) { - Thread.yield(); - } - } + StreamsResultSet current = new StreamsResultSet(datums); + try { + lock.writeLock().lock(); + current = new StreamsResultSet(providerQueue); + providerQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); } - return new StreamsResultSet(datums); + + return current; } @Override @@ -115,14 +93,12 @@ public class DatasiftPushProvider implements StreamsProvider { @Override public boolean isRunning() { - return this.clients != null && this.clients.size() > 0; + return true; } @Override public void prepare(Object configurationObject) { - this.interactions = new ConcurrentLinkedQueue<Interaction>(); - this.clients = Maps.newHashMap(); - this.mapper = StreamsJacksonMapper.getInstance(); + this.providerQueue = constructQueue(); } @Override @@ -138,6 +114,9 @@ public class DatasiftPushProvider implements StreamsProvider { this.config = config; } + private Queue<StreamsDatum> constructQueue() { + return Queues.newConcurrentLinkedQueue(); + } /** * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST