Repository: incubator-streams Updated Branches: refs/heads/STREAMS-212.3 [created] 91dd9a3c5
These classes were deleted prematurely, causing unnecessary breaking changes. They should deprecated as part of STREAMS-218 and deleted in the next minor version. Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/91dd9a3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/91dd9a3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/91dd9a3c Branch: refs/heads/STREAMS-212.3 Commit: 91dd9a3c5a489239edffb341f4aea89c919a4b33 Parents: d0b5a0a Author: sblackmon <[email protected]> Authored: Tue Nov 18 10:59:53 2014 -0600 Committer: sblackmon <[email protected]> Committed: Tue Nov 18 10:59:53 2014 -0600 ---------------------------------------------------------------------- .../processor/TwitterEventProcessor.java | 194 +++++++++++++++++ .../twitter/processor/TwitterTypeConverter.java | 209 +++++++++++++++++++ 2 files changed, 403 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/91dd9a3c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java new file mode 100644 index 0000000..fb4615f --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.processor; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.pojo.Delete; +import org.apache.streams.twitter.pojo.Retweet; +import org.apache.streams.twitter.pojo.Tweet; +import org.apache.streams.twitter.provider.TwitterEventClassifier; +import org.apache.streams.twitter.serializer.*; +import org.apache.streams.util.ComponentUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; + +/** + * Created by sblackmon on 12/10/13. + */ +public class TwitterEventProcessor implements StreamsProcessor { + + private final static String STREAMS_ID = "TwitterEventProcessor"; + + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class); + + private ObjectMapper mapper = new StreamsTwitterMapper(); + + private Class inClass; + private Class outClass; + + private TwitterJsonActivitySerializer twitterJsonActivitySerializer; + + public TwitterEventProcessor(Class inClass, Class outClass) { + this.inClass = inClass; + this.outClass = outClass; + } + + public TwitterEventProcessor( Class outClass) { + this(null, outClass); + } + + public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException { + + Object result = null; + + Preconditions.checkNotNull(event); + Preconditions.checkNotNull(mapper); + Preconditions.checkNotNull(twitterJsonActivitySerializer); + + if( outClass.equals( Activity.class )) { + LOGGER.debug("ACTIVITY"); + result = twitterJsonActivitySerializer.deserialize( + mapper.writeValueAsString(event)); + } else if( outClass.equals( Tweet.class )) { + if ( inClass.equals( Tweet.class )) { + LOGGER.debug("TWEET"); + result = mapper.convertValue(event, Tweet.class); + } + } else if( outClass.equals( Retweet.class )) { + if ( inClass.equals( Retweet.class )) { + LOGGER.debug("RETWEET"); + result = mapper.convertValue(event, Retweet.class); + } + } else if( outClass.equals( Delete.class )) { + if ( inClass.equals( Delete.class )) { + LOGGER.debug("DELETE"); + result = mapper.convertValue(event, Delete.class); + } + } else if( outClass.equals( ObjectNode.class )) { + LOGGER.debug("OBJECTNODE"); + result = mapper.convertValue(event, ObjectNode.class); + } + + // no supported conversion were applied + if( result != null ) + return result; + + LOGGER.debug("CONVERT FAILED"); + + return null; + + } + + public boolean validate(Object document, Class klass) { + + // TODO + return true; + } + + public boolean isValidJSON(final String json) { + boolean valid = false; + try { + final JsonParser parser = new ObjectMapper().getJsonFactory() + .createJsonParser(json); + while (parser.nextToken() != null) { + } + valid = true; + } catch (JsonParseException jpe) { + LOGGER.warn("validate: {}", jpe); + } catch (IOException ioe) { + LOGGER.warn("validate: {}", ioe); + } + + return valid; + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + + // first check for valid json + ObjectNode node = (ObjectNode) entry.getDocument(); + + LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass()); + + String json = null; + try { + json = mapper.writeValueAsString(node); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + if( StringUtils.isNotEmpty(json)) { + + // since data is coming from outside provider, we don't know what type the events are + Class inClass = TwitterEventClassifier.detectClass(json); + + // if the target is string, just pass-through + if (java.lang.String.class.equals(outClass)) + return Lists.newArrayList(new StreamsDatum(json)); + else { + // convert to desired format + Object out = null; + try { + out = convert(node, inClass, outClass); + } catch (ActivitySerializerException e) { + LOGGER.warn("Failed deserializing", e); + return Lists.newArrayList(); + } catch (JsonProcessingException e) { + LOGGER.warn("Failed parsing JSON", e); + return Lists.newArrayList(); + } + + if (out != null && validate(out, outClass)) + return Lists.newArrayList(new StreamsDatum(out)); + } + } + + return Lists.newArrayList(); + + } + + @Override + public void prepare(Object configurationObject) { + mapper = new StreamsJacksonMapper(); + twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); + } + + @Override + public void cleanUp() { + + } +}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/91dd9a3c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java new file mode 100644 index 0000000..74cce27 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.processor; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.pojo.Delete; +import org.apache.streams.twitter.pojo.Retweet; +import org.apache.streams.twitter.pojo.Tweet; +import org.apache.streams.twitter.provider.TwitterEventClassifier; +import org.apache.streams.twitter.serializer.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Queue; + +/** + * Created by sblackmon on 12/10/13. + */ +public class TwitterTypeConverter implements StreamsProcessor { + + public final static String STREAMS_ID = "TwitterTypeConverter"; + + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class); + + private ObjectMapper mapper; + + private Queue<StreamsDatum> inQueue; + private Queue<StreamsDatum> outQueue; + + private Class inClass; + private Class outClass; + + private TwitterJsonActivitySerializer twitterJsonActivitySerializer; + + private int count = 0; + + public final static String TERMINATE = new String("TERMINATE"); + + public TwitterTypeConverter(Class inClass, Class outClass) { + this.inClass = inClass; + this.outClass = outClass; + } + + public Queue<StreamsDatum> getProcessorOutputQueue() { + return outQueue; + } + + public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) { + inQueue = inputQueue; + } + + public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException { + + Object result = null; + + if( outClass.equals( Activity.class )) { + LOGGER.debug("ACTIVITY"); + result = twitterJsonActivitySerializer.deserialize( + mapper.writeValueAsString(event)); + } else if( outClass.equals( Tweet.class )) { + if ( inClass.equals( Tweet.class )) { + LOGGER.debug("TWEET"); + result = mapper.convertValue(event, Tweet.class); + } + } else if( outClass.equals( Retweet.class )) { + if ( inClass.equals( Retweet.class )) { + LOGGER.debug("RETWEET"); + result = mapper.convertValue(event, Retweet.class); + } + } else if( outClass.equals( Delete.class )) { + if ( inClass.equals( Delete.class )) { + LOGGER.debug("DELETE"); + result = mapper.convertValue(event, Delete.class); + } + } else if( outClass.equals( ObjectNode.class )) { + LOGGER.debug("OBJECTNODE"); + result = mapper.convertValue(event, ObjectNode.class); + } + + // no supported conversion were applied + if( result != null ) { + count ++; + return result; + } + + LOGGER.debug("CONVERT FAILED"); + + return null; + + } + + public boolean validate(Object document, Class klass) { + + // TODO + return true; + } + + public boolean isValidJSON(final String json) { + boolean valid = false; + try { + final JsonParser parser = new ObjectMapper().getJsonFactory() + .createJsonParser(json); + while (parser.nextToken() != null) { + } + valid = true; + } catch (JsonParseException jpe) { + LOGGER.warn("validate: {}", jpe); + } catch (IOException ioe) { + LOGGER.warn("validate: {}", ioe); + } + + return valid; + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + + StreamsDatum result = null; + + try { + + Object item = entry.getDocument(); + ObjectNode node; + + LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); + + if( item instanceof String ) { + + // if the target is string, just pass-through + if( String.class.equals(outClass)) { + result = entry; + } + else { + // first check for valid json + node = (ObjectNode)mapper.readTree((String)item); + + // since data is coming from outside provider, we don't know what type the events are + Class inClass = TwitterEventClassifier.detectClass((String) item); + + Object out = convert(node, inClass, outClass); + + if( out != null && validate(out, outClass)) + result = new StreamsDatum(out); + } + + } else if( item instanceof ObjectNode ) { + + // first check for valid json + node = (ObjectNode)mapper.valueToTree(item); + + // since data is coming from outside provider, we don't know what type the events are + Class inClass = TwitterEventClassifier.detectClass(mapper.writeValueAsString(item)); + + Object out = convert(node, inClass, outClass); + + if( out != null && validate(out, outClass)) + result = new StreamsDatum(out); + + } + + } catch (Exception e) { + e.printStackTrace(); + } + + if( result != null ) + return Lists.newArrayList(result); + else + return Lists.newArrayList(); + } + + @Override + public void prepare(Object o) { + mapper = new StreamsTwitterMapper(); + twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); + } + + @Override + public void cleanUp() { + + } + +}
