Author: sblackmon
Date: Mon Feb 24 23:20:47 2014
New Revision: 1571485
URL: http://svn.apache.org/r1571485
Log:
Activating kafka and hdfs modules
Ensuring providers, processors, and persisters are serializable
Modified:
incubator/streams/branches/STREAMS-26/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
Modified: incubator/streams/branches/STREAMS-26/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/pom.xml?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/pom.xml Mon Feb 24 23:20:47 2014
@@ -79,7 +79,7 @@
<clojure.version>1.4.0</clojure.version>
<storm.version>0.9.0.1</storm.version>
<kafka.version>0.8.0</kafka.version>
- <zookeeper.version>3.3.4</zookeeper.version>
+ <zookeeper.version>3.4.5-cdh4.5.0</zookeeper.version>
<netty.version>3.8.0.Final</netty.version>
<json-path.version>0.9.0</json-path.version>
</properties>
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml Mon Feb 24
23:20:47 2014
@@ -38,15 +38,15 @@
<modules>
<module>streams-persist-console</module>
<module>streams-persist-elasticsearch</module>
- <!--<module>streams-persist-hdfs</module>-->
- <!--<module>streams-persist-kafka</module>-->
+ <module>streams-persist-hdfs</module>
+ <module>streams-persist-kafka</module>
<!--<module>streams-provider-datasift</module>-->
<!--<module>streams-provider-facebook</module>-->
<!--<module>streams-provider-gnip</module>-->
<!--<module>streams-provider-moreover</module>-->
<module>streams-provider-twitter</module>
<!--<module>streams-provider-sysomos</module>-->
- <!--<module>streams-provider-rss</module>-->
+ <module>streams-provider-rss</module>
<!--<module>streams-proxy-semantria</module>-->
</modules>
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
Mon Feb 24 23:20:47 2014
@@ -16,6 +16,8 @@ import org.apache.streams.core.StreamsPe
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.streams.hdfs.HdfsConfiguration;
+
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
@@ -278,14 +280,12 @@ public class WebHdfsPersistWriter implem
.toString();
}
- @Override
public void start() {
connectToWebHDFS();
}
- @Override
public void stop() {
try {
@@ -300,12 +300,10 @@ public class WebHdfsPersistWriter implem
}
}
- @Override
public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
this.persistQueue = persistQueue;
}
- @Override
public Queue<StreamsDatum> getPersistQueue() {
return persistQueue;
}
@@ -327,4 +325,23 @@ public class WebHdfsPersistWriter implem
stop();
}
+
+ @Override
+ public void prepare(Object configurationObject) {
+ connectToWebHDFS();
+ }
+
+ @Override
+ public void cleanUp() {
+ try {
+ flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
}
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
Mon Feb 24 23:20:47 2014
@@ -17,6 +17,8 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.streams.kafka.KafkaConfiguration;
+
import java.io.Serializable;
import java.math.BigInteger;
import java.util.List;
@@ -56,17 +58,10 @@ public class KafkaPersistReader implemen
this.persistQueue = persistQueue;
}
- public KafkaPersistReader(KafkaConfiguration config) {
+ public void setConfig(KafkaConfiguration config) {
this.config = config;
- this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
- }
-
- public KafkaPersistReader(KafkaConfiguration config, Queue<StreamsDatum>
persistQueue) {
- this.config = config;
- this.persistQueue = persistQueue;
}
- @Override
public void start() {
Properties props = new Properties();
props.setProperty("serializer.encoding", "UTF8");
@@ -86,7 +81,6 @@ public class KafkaPersistReader implemen
}
- @Override
public void stop() {
consumerConnector.shutdown();
while( !executor.isTerminated()) {
@@ -96,18 +90,21 @@ public class KafkaPersistReader implemen
}
}
- @Override
public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
this.persistQueue = persistQueue;
}
- @Override
public Queue<StreamsDatum> getPersistQueue() {
return this.persistQueue;
}
@Override
public StreamsResultSet readAll() {
+ return readCurrent();
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
return null;
}
@@ -138,4 +135,14 @@ public class KafkaPersistReader implemen
// once this class can be told when to shutdown by streams, it will
run stop
// stop();
}
+
+ @Override
+ public void prepare(Object configurationObject) {
+ start();
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
}
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
Mon Feb 24 23:20:47 2014
@@ -42,17 +42,10 @@ public class KafkaPersistWriter implemen
this.persistQueue = persistQueue;
}
- public KafkaPersistWriter(KafkaConfiguration config) {
+ public void setConfig(KafkaConfiguration config) {
this.config = config;
- this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
}
- public KafkaPersistWriter(KafkaConfiguration config, Queue<StreamsDatum>
persistQueue) {
- this.config = config;
- this.persistQueue = persistQueue;
- }
-
- @Override
public void start() {
Properties props = new Properties();
@@ -68,17 +61,14 @@ public class KafkaPersistWriter implemen
new Thread(new KafkaPersistWriterTask(this)).start();
}
- @Override
public void stop() {
producer.close();
}
- @Override
public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
this.persistQueue = persistQueue;
}
- @Override
public Queue<StreamsDatum> getPersistQueue() {
return this.persistQueue;
}
@@ -106,4 +96,14 @@ public class KafkaPersistWriter implemen
// stop();
}
+
+ @Override
+ public void prepare(Object configurationObject) {
+ start();
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
}
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
Mon Feb 24 23:20:47 2014
@@ -31,7 +31,7 @@ import java.util.concurrent.*;
/**
* Created by sblackmon on 12/10/13.
*/
-public class RssStreamProvider /*extends BaseRichSpout*/ implements
StreamsProvider, Serializable {
+public class RssStreamProvider implements StreamsProvider, Serializable {
private final static Logger LOGGER =
LoggerFactory.getLogger(RssStreamProvider.class);
@@ -85,7 +85,6 @@ public class RssStreamProvider /*extends
this.klass = klass;
}
- @Override
public void start() {
Preconditions.checkNotNull(this.klass);
@@ -105,21 +104,21 @@ public class RssStreamProvider /*extends
}
- @Override
public void stop() {
for (int i = 0; i < ((config.getFeeds().size() / 5) + 1); i++) {
inQueue.add(RssEventProcessor.TERMINATE);
}
}
- @Override
public Queue<StreamsDatum> getProviderQueue() {
return this.providerQueue;
}
@Override
public StreamsResultSet readCurrent() {
- return null;
+
+ return (StreamsResultSet) providerQueue;
+
}
@Override
@@ -131,28 +130,16 @@ public class RssStreamProvider /*extends
public StreamsResultSet readRange(DateTime start, DateTime end) {
return null;
}
-//
-// @Override
-// public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
-// outputFieldsDeclarer.declare(new Fields("document"));
-// outputFieldsDeclarer.declare(new Fields("type"));
-// }
-//
-// @Override
-// public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
-// collector = spoutOutputCollector;
-// run();
-// }
-//
-// @Override
-// public void nextTuple() {
-// try {
-// collector.emit( new Values(outQueue.take(), klass) );
-// } catch (InterruptedException e) {
-// e.printStackTrace();
-// }
-//
-// }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ start();
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
private class RssFeedSetupTask implements Runnable {
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
Mon Feb 24 23:20:47 2014
@@ -4,7 +4,9 @@ import com.fasterxml.jackson.core.JsonPa
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
@@ -16,6 +18,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
@@ -23,7 +26,7 @@ import java.util.concurrent.BlockingQueu
/**
* Created by sblackmon on 12/10/13.
*/
-public class TwitterEventProcessor implements Runnable {
+public class TwitterEventProcessor implements StreamsProcessor, Runnable {
private final static Logger LOGGER =
LoggerFactory.getLogger(TwitterEventProcessor.class);
@@ -58,33 +61,25 @@ public class TwitterEventProcessor imple
public void run() {
while(true) {
+ String item;
try {
- String item = inQueue.take();
- Thread.sleep(new Random().nextInt(100));
- if(item==TERMINATE) {
+ item = inQueue.poll();
+ if(item instanceof String && item.equals(TERMINATE)) {
LOGGER.info("Terminating!");
- return;
+ break;
}
- // first check for valid json
- ObjectNode node = (ObjectNode)mapper.readTree(item);
+ ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
- // since data is coming from outside provider, we don't know
what type the events are
- Class inClass = TwitterEventClassifier.detectClass(item);
+ StreamsDatum rawDatum = new StreamsDatum(objectNode);
- // if the target is string, just pass-through
- if( java.lang.String.class.equals(outClass))
- outQueue.offer(new StreamsDatum(item));
- else {
- // convert to desired format
- Object out = convert(node, inClass, outClass);
-
- if( out != null && validate(out, outClass))
- outQueue.offer(new StreamsDatum(out));
+ for( StreamsDatum entry : process(rawDatum)) {
+ outQueue.offer(entry);
}
} catch (Exception e) {
e.printStackTrace();
+
}
}
}
@@ -161,4 +156,39 @@ public class TwitterEventProcessor imple
return valid;
}
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ // first check for valid json
+ ObjectNode node = (ObjectNode) entry.getDocument();
+
+ String json = node.asText();
+
+ // since data is coming from outside provider, we don't know what type
the events are
+ Class inClass = TwitterEventClassifier.detectClass(json);
+
+ // if the target is string, just pass-through
+ if( java.lang.String.class.equals(outClass))
+ return Lists.newArrayList(new StreamsDatum(json));
+ else {
+ // convert to desired format
+ Object out = convert(node, inClass, outClass);
+
+ if( out != null && validate(out, outClass))
+ return Lists.newArrayList(new StreamsDatum(out));
+ }
+
+ return Lists.newArrayList();
+
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
};
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
Mon Feb 24 23:20:47 2014
@@ -56,7 +56,7 @@ public class TwitterStreamProvider imple
protected StreamingEndpoint endpoint;
protected BasicClient client;
- protected ListeningExecutorService executor =
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+ protected ListeningExecutorService executor;
private static ExecutorService newFixedThreadPoolWithQueueSize(int
nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
@@ -116,6 +116,8 @@ public class TwitterStreamProvider imple
@Override
public void prepare(Object o) {
+ executor =
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
Preconditions.checkNotNull(this.klass);
Preconditions.checkNotNull(config.getOauth().getConsumerKey());
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
Mon Feb 24 23:20:47 2014
@@ -29,7 +29,7 @@ import java.util.concurrent.*;
/**
* Created by sblackmon on 12/10/13.
*/
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable,
Runnable {
private final static Logger LOGGER =
LoggerFactory.getLogger(TwitterTimelineProvider.class);
@@ -57,7 +57,7 @@ public class TwitterTimelineProvider imp
// return inQueue;
// }
- protected ListeningExecutorService executor =
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+ protected ListeningExecutorService executor;
protected DateTime start;
protected DateTime end;
@@ -88,8 +88,14 @@ public class TwitterTimelineProvider imp
this.klass = klass;
}
+ public Queue<StreamsDatum> getProviderQueue() {
+ return this.providerQueue;
+ }
+
public void run() {
+ executor =
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
Preconditions.checkNotNull(providerQueue);
Preconditions.checkNotNull(this.klass);
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
Mon Feb 24 23:20:47 2014
@@ -8,6 +8,8 @@ import org.apache.streams.pojo.json.Acto
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Tweet;
+import java.io.Serializable;
+
/**
* Created with IntelliJ IDEA.
* User: mdelaet
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
Mon Feb 24 23:20:47 2014
@@ -15,6 +15,7 @@ import org.apache.streams.pojo.json.Icon
import org.apache.streams.pojo.json.Provider;
import java.io.IOException;
+import java.io.Serializable;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -29,7 +30,7 @@ import java.util.Map;
* Time: 9:24 AM
* To change this template use File | Settings | File Templates.
*/
-public abstract class TwitterJsonEventActivitySerializer implements
ActivitySerializer<String> {
+public abstract class TwitterJsonEventActivitySerializer implements
ActivitySerializer<String>, Serializable {
public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";