Author: sblackmon
Date: Mon Feb 24 23:20:47 2014
New Revision: 1571485

URL: http://svn.apache.org/r1571485
Log:
Activating kafka and hdfs modules
Ensuring providers, processors, and persisters are serializable

Modified:
    incubator/streams/branches/STREAMS-26/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java

Modified: incubator/streams/branches/STREAMS-26/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/pom.xml?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/pom.xml Mon Feb 24 23:20:47 2014
@@ -79,7 +79,7 @@
         <clojure.version>1.4.0</clojure.version>
         <storm.version>0.9.0.1</storm.version>
         <kafka.version>0.8.0</kafka.version>
-        <zookeeper.version>3.3.4</zookeeper.version>
+        <zookeeper.version>3.4.5-cdh4.5.0</zookeeper.version>
         <netty.version>3.8.0.Final</netty.version>
         <json-path.version>0.9.0</json-path.version>
     </properties>

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml Mon Feb 24 
23:20:47 2014
@@ -38,15 +38,15 @@
     <modules>
         <module>streams-persist-console</module>
         <module>streams-persist-elasticsearch</module>
-        <!--<module>streams-persist-hdfs</module>-->
-        <!--<module>streams-persist-kafka</module>-->
+        <module>streams-persist-hdfs</module>
+        <module>streams-persist-kafka</module>
         <!--<module>streams-provider-datasift</module>-->
         <!--<module>streams-provider-facebook</module>-->
         <!--<module>streams-provider-gnip</module>-->
         <!--<module>streams-provider-moreover</module>-->
         <module>streams-provider-twitter</module>
         <!--<module>streams-provider-sysomos</module>-->
-        <!--<module>streams-provider-rss</module>-->
+        <module>streams-provider-rss</module>
         <!--<module>streams-proxy-semantria</module>-->
     </modules>
 

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
 Mon Feb 24 23:20:47 2014
@@ -16,6 +16,8 @@ import org.apache.streams.core.StreamsPe
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.streams.hdfs.HdfsConfiguration;
+
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.net.URI;
@@ -278,14 +280,12 @@ public class WebHdfsPersistWriter implem
                     .toString();
     }
 
-    @Override
     public void start() {
 
         connectToWebHDFS();
 
     }
 
-    @Override
     public void stop() {
 
         try {
@@ -300,12 +300,10 @@ public class WebHdfsPersistWriter implem
         }
     }
 
-    @Override
     public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
         this.persistQueue = persistQueue;
     }
 
-    @Override
     public Queue<StreamsDatum> getPersistQueue() {
         return persistQueue;
     }
@@ -327,4 +325,23 @@ public class WebHdfsPersistWriter implem
 
         stop();
     }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        connectToWebHDFS();
+    }
+
+    @Override
+    public void cleanUp() {
+        try {
+            flush();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
 }

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
 Mon Feb 24 23:20:47 2014
@@ -17,6 +17,8 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.streams.kafka.KafkaConfiguration;
+
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.List;
@@ -56,17 +58,10 @@ public class KafkaPersistReader implemen
         this.persistQueue = persistQueue;
     }
 
-    public KafkaPersistReader(KafkaConfiguration config) {
+    public void setConfig(KafkaConfiguration config) {
         this.config = config;
-        this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
-    }
-
-    public KafkaPersistReader(KafkaConfiguration config, Queue<StreamsDatum> 
persistQueue) {
-        this.config = config;
-        this.persistQueue = persistQueue;
     }
 
-    @Override
     public void start() {
         Properties props = new Properties();
         props.setProperty("serializer.encoding", "UTF8");
@@ -86,7 +81,6 @@ public class KafkaPersistReader implemen
 
     }
 
-    @Override
     public void stop() {
         consumerConnector.shutdown();
         while( !executor.isTerminated()) {
@@ -96,18 +90,21 @@ public class KafkaPersistReader implemen
         }
     }
 
-    @Override
     public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
         this.persistQueue = persistQueue;
     }
 
-    @Override
     public Queue<StreamsDatum> getPersistQueue() {
         return this.persistQueue;
     }
 
     @Override
     public StreamsResultSet readAll() {
+        return readCurrent();
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
         return null;
     }
 
@@ -138,4 +135,14 @@ public class KafkaPersistReader implemen
         // once this class can be told when to shutdown by streams, it will 
run stop
         // stop();
     }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        start();
+    }
+
+    @Override
+    public void cleanUp() {
+        stop();
+    }
 }

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
 Mon Feb 24 23:20:47 2014
@@ -42,17 +42,10 @@ public class KafkaPersistWriter implemen
         this.persistQueue = persistQueue;
     }
 
-    public KafkaPersistWriter(KafkaConfiguration config) {
+    public void setConfig(KafkaConfiguration config) {
         this.config = config;
-        this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
     }
 
-    public KafkaPersistWriter(KafkaConfiguration config, Queue<StreamsDatum> 
persistQueue) {
-        this.config = config;
-        this.persistQueue = persistQueue;
-    }
-
-    @Override
     public void start() {
         Properties props = new Properties();
 
@@ -68,17 +61,14 @@ public class KafkaPersistWriter implemen
         new Thread(new KafkaPersistWriterTask(this)).start();
     }
 
-    @Override
     public void stop() {
         producer.close();
     }
 
-    @Override
     public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
         this.persistQueue = persistQueue;
     }
 
-    @Override
     public Queue<StreamsDatum> getPersistQueue() {
         return this.persistQueue;
     }
@@ -106,4 +96,14 @@ public class KafkaPersistWriter implemen
 
         // stop();
     }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        start();
+    }
+
+    @Override
+    public void cleanUp() {
+        stop();
+    }
 }

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
 Mon Feb 24 23:20:47 2014
@@ -31,7 +31,7 @@ import java.util.concurrent.*;
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class RssStreamProvider /*extends BaseRichSpout*/ implements 
StreamsProvider, Serializable {
+public class RssStreamProvider implements StreamsProvider, Serializable {
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(RssStreamProvider.class);
 
@@ -85,7 +85,6 @@ public class RssStreamProvider /*extends
         this.klass = klass;
     }
 
-    @Override
     public void start() {
 
         Preconditions.checkNotNull(this.klass);
@@ -105,21 +104,21 @@ public class RssStreamProvider /*extends
 
     }
 
-    @Override
     public void stop() {
         for (int i = 0; i < ((config.getFeeds().size() / 5) + 1); i++) {
             inQueue.add(RssEventProcessor.TERMINATE);
         }
     }
 
-    @Override
     public Queue<StreamsDatum> getProviderQueue() {
         return this.providerQueue;
     }
 
     @Override
     public StreamsResultSet readCurrent() {
-        return null;
+
+        return (StreamsResultSet) providerQueue;
+
     }
 
     @Override
@@ -131,28 +130,16 @@ public class RssStreamProvider /*extends
     public StreamsResultSet readRange(DateTime start, DateTime end) {
         return null;
     }
-//
-//    @Override
-//    public void declareOutputFields(OutputFieldsDeclarer 
outputFieldsDeclarer) {
-//        outputFieldsDeclarer.declare(new Fields("document"));
-//        outputFieldsDeclarer.declare(new Fields("type"));
-//    }
-//
-//    @Override
-//    public void open(Map map, TopologyContext topologyContext, 
SpoutOutputCollector spoutOutputCollector) {
-//        collector = spoutOutputCollector;
-//        run();
-//    }
-//
-//    @Override
-//    public void nextTuple() {
-//        try {
-//            collector.emit( new Values(outQueue.take(), klass) );
-//        } catch (InterruptedException e) {
-//            e.printStackTrace();
-//        }
-//
-//    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        start();
+    }
+
+    @Override
+    public void cleanUp() {
+        stop();
+    }
 
     private class RssFeedSetupTask implements Runnable {
 

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
 Mon Feb 24 23:20:47 2014
@@ -4,7 +4,9 @@ import com.fasterxml.jackson.core.JsonPa
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
@@ -16,6 +18,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
@@ -23,7 +26,7 @@ import java.util.concurrent.BlockingQueu
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class TwitterEventProcessor implements Runnable {
+public class TwitterEventProcessor implements StreamsProcessor, Runnable {
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterEventProcessor.class);
 
@@ -58,33 +61,25 @@ public class TwitterEventProcessor imple
     public void run() {
 
         while(true) {
+            String item;
             try {
-                String item = inQueue.take();
-                Thread.sleep(new Random().nextInt(100));
-                if(item==TERMINATE) {
+                item = inQueue.poll();
+                if(item instanceof String && item.equals(TERMINATE)) {
                     LOGGER.info("Terminating!");
-                    return;
+                    break;
                 }
 
-                // first check for valid json
-                ObjectNode node = (ObjectNode)mapper.readTree(item);
+                ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
 
-                // since data is coming from outside provider, we don't know 
what type the events are
-                Class inClass = TwitterEventClassifier.detectClass(item);
+                StreamsDatum rawDatum = new StreamsDatum(objectNode);
 
-                // if the target is string, just pass-through
-                if( java.lang.String.class.equals(outClass))
-                    outQueue.offer(new StreamsDatum(item));
-                else {
-                    // convert to desired format
-                    Object out = convert(node, inClass, outClass);
-
-                    if( out != null && validate(out, outClass))
-                        outQueue.offer(new StreamsDatum(out));
+                for( StreamsDatum entry : process(rawDatum)) {
+                    outQueue.offer(entry);
                 }
 
             } catch (Exception e) {
                 e.printStackTrace();
+
             }
         }
     }
@@ -161,4 +156,39 @@ public class TwitterEventProcessor imple
         return valid;
     }
 
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        // first check for valid json
+        ObjectNode node = (ObjectNode) entry.getDocument();
+
+        String json = node.asText();
+
+        // since data is coming from outside provider, we don't know what type 
the events are
+        Class inClass = TwitterEventClassifier.detectClass(json);
+
+        // if the target is string, just pass-through
+        if( java.lang.String.class.equals(outClass))
+            return Lists.newArrayList(new StreamsDatum(json));
+        else {
+            // convert to desired format
+            Object out = convert(node, inClass, outClass);
+
+            if( out != null && validate(out, outClass))
+                return Lists.newArrayList(new StreamsDatum(out));
+        }
+
+        return Lists.newArrayList();
+
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
 };

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
 Mon Feb 24 23:20:47 2014
@@ -56,7 +56,7 @@ public class TwitterStreamProvider imple
     protected StreamingEndpoint endpoint;
     protected BasicClient client;
 
-    protected ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+    protected ListeningExecutorService executor;
 
     private static ExecutorService newFixedThreadPoolWithQueueSize(int 
nThreads, int queueSize) {
         return new ThreadPoolExecutor(nThreads, nThreads,
@@ -116,6 +116,8 @@ public class TwitterStreamProvider imple
     @Override
     public void prepare(Object o) {
 
+        executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
         Preconditions.checkNotNull(this.klass);
 
         Preconditions.checkNotNull(config.getOauth().getConsumerKey());

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
 Mon Feb 24 23:20:47 2014
@@ -29,7 +29,7 @@ import java.util.concurrent.*;
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable, 
Runnable {
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterTimelineProvider.class);
 
@@ -57,7 +57,7 @@ public class TwitterTimelineProvider imp
 //        return inQueue;
 //    }
 
-    protected ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+    protected ListeningExecutorService executor;
 
     protected DateTime start;
     protected DateTime end;
@@ -88,8 +88,14 @@ public class TwitterTimelineProvider imp
         this.klass = klass;
     }
 
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
     public void run() {
 
+        executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
         Preconditions.checkNotNull(providerQueue);
 
         Preconditions.checkNotNull(this.klass);

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
 Mon Feb 24 23:20:47 2014
@@ -8,6 +8,8 @@ import org.apache.streams.pojo.json.Acto
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Tweet;
 
+import java.io.Serializable;
+
 /**
 * Created with IntelliJ IDEA.
 * User: mdelaet

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java?rev=1571485&r1=1571484&r2=1571485&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
 Mon Feb 24 23:20:47 2014
@@ -15,6 +15,7 @@ import org.apache.streams.pojo.json.Icon
 import org.apache.streams.pojo.json.Provider;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -29,7 +30,7 @@ import java.util.Map;
 * Time: 9:24 AM
 * To change this template use File | Settings | File Templates.
 */
-public abstract class TwitterJsonEventActivitySerializer implements 
ActivitySerializer<String> {
+public abstract class TwitterJsonEventActivitySerializer implements 
ActivitySerializer<String>, Serializable {
 
     public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
 


Reply via email to