Author: sblackmon
Date: Mon Feb 24 23:34:13 2014
New Revision: 1571490
URL: http://svn.apache.org/r1571490
Log:
adding a few missing files
Added:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
incubator/streams/branches/STREAMS-26/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml?rev=1571490&r1=1571489&r2=1571490&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
Mon Feb 24 23:34:13 2014
@@ -1,15 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
-<module type="JAVA_MODULE" version="4">
+<module
org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true"
type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6"
inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
- <sourceFolder
url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo"
isTestSource="false" generated="true" />
<sourceFolder url="file://$MODULE_DIR$/src/main/java"
isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java"
isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/src/main/resources"
type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/resources"
type="java-test-resource" />
- <excludeFolder url="file://$MODULE_DIR$/target" />
+ <sourceFolder
url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo"
isTestSource="false" generated="true" />
+ <excludeFolder url="file://$MODULE_DIR$/target/classes" />
+ <excludeFolder url="file://$MODULE_DIR$/target/maven-archiver" />
+ <excludeFolder
url="file://$MODULE_DIR$/target/maven-shared-archive-resources" />
+ <excludeFolder url="file://$MODULE_DIR$/target/maven-status" />
+ <excludeFolder url="file://$MODULE_DIR$/target/surefire-reports" />
+ <excludeFolder url="file://$MODULE_DIR$/target/test-classes" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
@@ -30,6 +35,9 @@
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11"
level="project" />
<orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3"
level="project" />
<orderEntry type="module" module-name="streams-core" />
+ <orderEntry type="module" module-name="streams-util" />
+ <orderEntry type="library" name="Maven:
org.apache.commons:commons-lang3:3.1" level="project" />
+ <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1"
level="project" />
<orderEntry type="library" name="Maven:
ch.qos.logback:logback-classic:1.0.9" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9"
level="project" />
<orderEntry type="module" module-name="streams-pojo" />
@@ -49,8 +57,6 @@
<orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0"
level="project" />
<orderEntry type="library" name="Maven: commons-io:commons-io:2.4"
level="project" />
<orderEntry type="module" module-name="streams-config" />
- <orderEntry type="library" name="Maven: com.google.guava:guava:15.0"
level="project" />
- <orderEntry type="library" name="Maven:
com.google.collections:google-collections:1.0" level="project" />
<orderEntry type="library" name="Maven:
com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.2.1" level="project"
/>
<orderEntry type="library" name="Maven:
com.jayway.jsonpath:json-path:0.9.0" level="project" />
<orderEntry type="library" name="Maven: net.minidev:json-smart:1.2"
level="project" />
Added:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java?rev=1571490&view=auto
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
(added)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
Mon Feb 24 23:34:13 2014
@@ -0,0 +1,111 @@
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
+
+ private final static Logger LOGGER =
LoggerFactory.getLogger(TwitterProfileProcessor.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private Queue<StreamsDatum> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ @Override
+ public void run() {
+
+ while(true) {
+ StreamsDatum item;
+ try {
+ item = inQueue.poll();
+ if(item.getDocument() instanceof String &&
item.equals(TERMINATE)) {
+ LOGGER.info("Terminating!");
+ break;
+ }
+
+ Thread.sleep(new Random().nextInt(100));
+
+ for( StreamsDatum entry : process(item)) {
+ outQueue.offer(entry);
+ }
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+ String item;
+ try {
+ // first check for valid json
+ // since data is coming from outside provider, we don't know what
type the events are
+ if( entry.getDocument() instanceof String) {
+ item = (String) entry.getDocument();
+ } else {
+ item =
mapper.writeValueAsString((ObjectNode)entry.getDocument());
+ }
+
+ Class inClass = TwitterEventClassifier.detectClass(item);
+
+ User user;
+
+ if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("TWEET");
+ Tweet tweet = mapper.readValue(item, Tweet.class);
+ user = tweet.getUser();
+ result.add(new StreamsDatum(user));
+ }
+ else if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("RETWEET");
+ Retweet retweet = mapper.readValue(item, Retweet.class);
+ user = retweet.getRetweetedStatus().getUser();
+ result.add(new StreamsDatum(user));
+ } else {
+ return Lists.newArrayList();
+ }
+
+ return result;
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Error processing " + entry.toString());
+ return Lists.newArrayList();
+ }
+ }
+
+ @Override
+ public void prepare(Object o) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+};
Added:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java?rev=1571490&view=auto
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
(added)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
Mon Feb 24 23:34:13 2014
@@ -0,0 +1,199 @@
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import
org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import
org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import
org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterTypeConverter implements StreamsProcessor, Runnable {
+
+ private final static Logger LOGGER =
LoggerFactory.getLogger(TwitterTypeConverter.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private Queue<StreamsDatum> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ private Class inClass;
+ private Class outClass;
+
+ private TwitterJsonTweetActivitySerializer
twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
+ private TwitterJsonRetweetActivitySerializer
twitterJsonRetweetActivitySerializer = new
TwitterJsonRetweetActivitySerializer();
+ private TwitterJsonDeleteActivitySerializer
twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ public TwitterTypeConverter(Class inClass, Class outClass) {
+ this.inClass = inClass;
+ this.outClass = outClass;
+ }
+
+ public Queue<StreamsDatum> getProcessorOutputQueue() {
+ return outQueue;
+ }
+
+ public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
+ inQueue = inputQueue;
+ }
+
+ public Object convert(ObjectNode event, Class inClass, Class outClass) {
+
+ LOGGER.debug(event.toString());
+
+ Object result = null;
+
+ if( outClass.equals( Activity.class )) {
+ if( inClass.equals( Delete.class )) {
+ LOGGER.debug("ACTIVITY DELETE");
+ result = twitterJsonDeleteActivitySerializer.convert(event);
+ } else if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("ACTIVITY RETWEET");
+ result = twitterJsonRetweetActivitySerializer.convert(event);
+ } else if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("ACTIVITY TWEET");
+ result = twitterJsonTweetActivitySerializer.convert(event);
+ } else {
+ return null;
+ }
+ } else if( outClass.equals( Tweet.class )) {
+ if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("TWEET");
+ result = mapper.convertValue(event, Tweet.class);
+ }
+ } else if( outClass.equals( Retweet.class )) {
+ if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("RETWEET");
+ result = mapper.convertValue(event, Retweet.class);
+ }
+ } else if( outClass.equals( Delete.class )) {
+ if ( inClass.equals( Delete.class )) {
+ LOGGER.debug("DELETE");
+ result = mapper.convertValue(event, Delete.class);
+ }
+ } else if( outClass.equals( ObjectNode.class )) {
+ LOGGER.debug("OBJECTNODE");
+ result = mapper.convertValue(event, ObjectNode.class);
+ }
+
+ // no supported conversion were applied
+ if( result != null )
+ return result;
+
+ LOGGER.debug("CONVERT FAILED");
+
+ return null;
+
+ }
+
+ public boolean validate(Object document, Class klass) {
+
+ // TODO
+ return true;
+ }
+
+ public boolean isValidJSON(final String json) {
+ boolean valid = false;
+ try {
+ final JsonParser parser = new ObjectMapper().getJsonFactory()
+ .createJsonParser(json);
+ while (parser.nextToken() != null) {
+ }
+ valid = true;
+ } catch (JsonParseException jpe) {
+ LOGGER.warn("validate: {}", jpe);
+ } catch (IOException ioe) {
+ LOGGER.warn("validate: {}", ioe);
+ }
+
+ return valid;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ StreamsDatum result = null;
+
+ try {
+
+ Object item = entry.getDocument();
+ ObjectNode node;
+
+ if( item instanceof String ) {
+
+ // if the target is string, just pass-through
+ if( String.class.equals(outClass))
+ outQueue.offer(entry);
+ else {
+ // first check for valid json
+ node = (ObjectNode)mapper.readTree((String)item);
+
+ // since data is coming from outside provider, we don't
know what type the events are
+ Class inClass =
TwitterEventClassifier.detectClass((String)item);
+
+ Object out = convert(node, inClass, outClass);
+
+ if( out != null && validate(out, outClass))
+ result = new StreamsDatum(out);
+ }
+
+ } else if( item instanceof ObjectNode ) {
+
+ // first check for valid json
+ node = (ObjectNode)mapper.valueToTree(item);
+
+ // since data is coming from outside provider, we don't know
what type the events are
+ Class inClass =
TwitterEventClassifier.detectClass((String)item);
+
+ Object out = convert(node, inClass, outClass);
+
+ if( out != null && validate(out, outClass))
+ result = new StreamsDatum(out);
+
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if( result != null )
+ return Lists.newArrayList(result);
+ else
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public void prepare(Object o) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+ @Override
+ public void run() {
+
+ }
+}
Added:
incubator/streams/branches/STREAMS-26/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java?rev=1571490&view=auto
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
(added)
+++
incubator/streams/branches/STREAMS-26/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
Mon Feb 24 23:34:13 2014
@@ -0,0 +1,50 @@
+package org.apache.streams.util;
+
+import java.io.*;
+
+/**
+ * Created by rebanks on 2/18/14.
+ */
+public class SerializationUtil {
+
+ /**
+ * BORROwED FROM APACHE STORM PROJECT
+ * @param obj
+ * @return
+ */
+ public static byte[] serialize(Object obj) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(obj);
+ oos.close();
+ return bos.toByteArray();
+ } catch(IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ /**
+ * BORROwED FROM APACHE STORM PROJECT
+ * @param serialized
+ * @return
+ */
+ public static Object deserialize(byte[] serialized) {
+ try {
+ ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ Object ret = ois.readObject();
+ ois.close();
+ return ret;
+ } catch(IOException ioe) {
+ throw new RuntimeException(ioe);
+ } catch(ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ public static Object cloneBySerialization(Object obj) {
+ return deserialize(serialize(obj));
+ }
+}