http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java index 6c08eb1..ae0709a 100644 --- a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java +++ b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java @@ -19,14 +19,15 @@ under the License. package org.apache.streams.jackson; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; import com.google.common.collect.Lists; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.jackson.StreamsJacksonMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,53 +37,58 @@ import java.util.Map; /** * This processor walks an input objectnode and corrects any artifacts - * that may have occured from improper serialization of jsonschema2pojo beans. + * that may have occured from improper serialization of jackson beans. * + * <p/> * The logic is also available for inclusion in other module via static import. */ public class CleanAdditionalPropertiesProcessor implements StreamsProcessor { - public static final String STREAMS_ID = "CleanAdditionalPropertiesProcessor"; + public static final String STREAMS_ID = "CleanAdditionalPropertiesProcessor"; - private static final Logger LOGGER = LoggerFactory.getLogger(CleanAdditionalPropertiesProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(CleanAdditionalPropertiesProcessor.class); - private ObjectMapper mapper; + private ObjectMapper mapper; - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum datum) { - List<StreamsDatum> result = Lists.newLinkedList(); - ObjectNode activity = this.mapper.convertValue(datum.getDocument(), ObjectNode.class); - cleanAdditionalProperties(activity); - datum.setDocument(activity); - result.add(datum); - return result; - } + @Override + public List<StreamsDatum> process(StreamsDatum datum) { + List<StreamsDatum> result = Lists.newLinkedList(); + ObjectNode activity = this.mapper.convertValue(datum.getDocument(), ObjectNode.class); + cleanAdditionalProperties(activity); + datum.setDocument(activity); + result.add(datum); + return result; + } - @Override - public void prepare(Object o) { - this.mapper = StreamsJacksonMapper.getInstance(); - this.mapper.registerModule(new JsonOrgModule()); - } + @Override + public void prepare(Object configurationObject) { + this.mapper = StreamsJacksonMapper.getInstance(); + this.mapper.registerModule(new JsonOrgModule()); + } - @Override - public void cleanUp() { + @Override + public void cleanUp() { - } + } - public static void cleanAdditionalProperties(ObjectNode node) { - if( node.get("additionalProperties") != null ) { - ObjectNode additionalProperties = (ObjectNode) node.get("additionalProperties"); - cleanAdditionalProperties(additionalProperties); - Iterator<Map.Entry<String, JsonNode>> jsonNodeIterator = additionalProperties.fields(); - while( jsonNodeIterator.hasNext() ) { - Map.Entry<String, JsonNode> entry = jsonNodeIterator.next(); - node.put(entry.getKey(), entry.getValue()); - } - } + /** + * Recursively removes all additionalProperties maps. + * @param node ObjectNode + */ + public static void cleanAdditionalProperties(ObjectNode node) { + if ( node.get("additionalProperties") != null ) { + ObjectNode additionalProperties = (ObjectNode) node.get("additionalProperties"); + cleanAdditionalProperties(additionalProperties); + Iterator<Map.Entry<String, JsonNode>> jsonNodeIterator = additionalProperties.fields(); + while ( jsonNodeIterator.hasNext() ) { + Map.Entry<String, JsonNode> entry = jsonNodeIterator.next(); + node.put(entry.getKey(), entry.getValue()); + } } + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/JsonUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/JsonUtil.java b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/JsonUtil.java new file mode 100644 index 0000000..ac4ff08 --- /dev/null +++ b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/JsonUtil.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.jackson; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; + +/** + * JSON utilities. + */ +public class JsonUtil { + + private JsonUtil() {} + + private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private static JsonFactory factory = mapper.getFactory(); + + public static JsonNode jsonToJsonNode(String json) { + JsonNode node; + try { + JsonParser jp = factory.createJsonParser(json); + node = mapper.readTree(jp); + } catch (IOException e) { + throw new RuntimeException("IO exception while reading JSON", e); + } + return node; + } + + public static String jsonNodeToJson(JsonNode node) { + try { + return mapper.writeValueAsString(node); + } catch (JsonProcessingException e) { + throw new RuntimeException("IO exception while writing JSON", e); + } + } + + public static <T> T jsonToObject(String json, Class<T> clazz) { + try { + return mapper.readValue(json, clazz); + } catch (IOException e) { + throw new RuntimeException("Could not map to object"); + } + } + + public static <T> T jsonNodeToObject(JsonNode node, Class<T> clazz) { + return mapper.convertValue(node, clazz); + } + + public static <T> JsonNode objectToJsonNode(T obj) { + return mapper.valueToTree(obj); + } + + public static <T> List<T> jsoNodeToList(JsonNode node, Class<T> clazz) { + return mapper.convertValue(node, new TypeReference<List<T>>() {}); + } + + public static <T> String objectToJson(T object) { + try { + return mapper.writeValueAsString(object); + } catch (IOException e) { + throw new RuntimeException("Could not map to object"); + } + } + + public static <T> T getObjFromFile(String filePath, Class<T> clazz) { + return jsonNodeToObject(getFromFile(filePath), clazz); + } + + public static JsonNode getFromFile(String filePath) { + JsonFactory factory = mapper.getFactory(); // since 2.1 use mapper.getFactory() instead + + JsonNode node = null; + try { + InputStream stream = getStreamForLocation(filePath); + JsonParser jp = factory.createParser(stream); + node = mapper.readTree(jp); + } catch (IOException e) { + throw new RuntimeException(e); + } + return node; + } + + private static InputStream getStreamForLocation(String filePath) throws FileNotFoundException { + InputStream stream = null; + if(filePath.startsWith("file:///")) { + stream = new FileInputStream(filePath.replace("file:///", "")); + } else if(filePath.startsWith("file:") || filePath.startsWith("/")) { + stream = new FileInputStream(filePath.replace("file:", "")); + } else { + //Assume classpath + stream = JsonUtil.class.getClassLoader().getResourceAsStream(filePath.replace("classpath:", "")); + } + + return stream; + } + + /** + * Creates an empty array if missing + * @param node object to create the array within + * @param field location to create the array + * @return the Map representing the extensions property + */ + public static ArrayNode ensureArray(ObjectNode node, String field) { + String[] path = Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]); + ObjectNode current = node; + ArrayNode result = null; + for( int i = 0; i < path.length; i++) { + current = ensureObject((ObjectNode) node.get(path[i]), path[i]); + } + if (current.get(field) == null) + current.put(field, mapper.createArrayNode()); + result = (ArrayNode) node.get(field); + return result; + } + + /** + * Creates an empty array if missing + * @param node objectnode to create the object within + * @param field location to create the object + * @return the Map representing the extensions property + */ + public static ObjectNode ensureObject(ObjectNode node, String field) { + String[] path = Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]); + ObjectNode current = node; + ObjectNode result = null; + for( int i = 0; i < path.length; i++) { + if (node.get(field) == null) + node.put(field, mapper.createObjectNode()); + current = (ObjectNode) node.get(field); + } + result = ensureObject((ObjectNode) node.get(path[path.length]), Joiner.on('.').join(Arrays.copyOfRange(path, 1, path.length))); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java index 454f99e..4736ee2 100644 --- a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java +++ b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java @@ -19,102 +19,123 @@ under the License. package org.apache.streams.jackson; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.Serializable; import java.util.List; /** - * + * TypeConverterProcessor changes the JVM type while maintaining + * the underlying document. */ public class TypeConverterProcessor implements StreamsProcessor { - public static final String STREAMS_ID = "TypeConverterProcessor"; - - private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class); - - private List<String> formats = Lists.newArrayList(); - - private ObjectMapper mapper; - - private Class inClass; - private Class outClass; - - public TypeConverterProcessor(Class inClass, Class outClass, ObjectMapper mapper) { - this.inClass = inClass; - this.outClass = outClass; - this.mapper = mapper; + public static final String STREAMS_ID = "TypeConverterProcessor"; + + private static final Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class); + + private List<String> formats = Lists.newArrayList(); + + private ObjectMapper mapper; + + private Class inClass; + private Class outClass; + + + /** + * TypeConverterProcessor constructor. + * @param inClass inClass + * @param outClass outClass + * @param mapper mapper + */ + public TypeConverterProcessor(Class inClass, Class outClass, ObjectMapper mapper) { + this.inClass = inClass; + this.outClass = outClass; + this.mapper = mapper; + } + + /** + * TypeConverterProcessor constructor. + * @param inClass inClass + * @param outClass outClass + * @param formats formats + */ + public TypeConverterProcessor(Class inClass, Class outClass, List<String> formats) { + this.inClass = inClass; + this.outClass = outClass; + this.formats = formats; + } + + /** + * TypeConverterProcessor constructor. + * @param inClass inClass + * @param outClass outClass + */ + public TypeConverterProcessor(Class inClass, Class outClass) { + this.inClass = inClass; + this.outClass = outClass; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + List<StreamsDatum> result = Lists.newLinkedList(); + Object inDoc = entry.getDocument(); + ObjectNode node = null; + if ( inClass == String.class + || inDoc instanceof String ) { + try { + node = this.mapper.readValue((String)entry.getDocument(), ObjectNode.class); + } catch (IOException ex) { + ex.printStackTrace(); + } + } else { + node = this.mapper.convertValue(inDoc, ObjectNode.class); } - public TypeConverterProcessor(Class inClass, Class outClass, List<String> formats) { - this.inClass = inClass; - this.outClass = outClass; - this.formats = formats; - } - - public TypeConverterProcessor(Class inClass, Class outClass) { - this.inClass = inClass; - this.outClass = outClass; - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newLinkedList(); - Object inDoc = entry.getDocument(); - ObjectNode node = null; - if( inClass == String.class || - inDoc instanceof String ) { - try { - node = this.mapper.readValue((String)entry.getDocument(), ObjectNode.class); - } catch (IOException e) { - e.printStackTrace(); - } + if (node != null) { + Object outDoc; + try { + if ( outClass == String.class ) { + outDoc = this.mapper.writeValueAsString(node); } else { - node = this.mapper.convertValue(inDoc, ObjectNode.class); + outDoc = this.mapper.convertValue(node, outClass); } - - if(node != null) { - Object outDoc; - try { - if( outClass == String.class ) - outDoc = this.mapper.writeValueAsString(node); - else - outDoc = this.mapper.convertValue(node, outClass); - - StreamsDatum outDatum = new StreamsDatum(outDoc, entry.getId(), entry.getTimestamp(), entry.getSequenceid()); - outDatum.setMetadata(entry.getMetadata()); - result.add(outDatum); - } catch (Throwable e) { - LOGGER.warn(e.getMessage()); - LOGGER.warn(node.toString()); - } - } - - return result; + StreamsDatum outDatum = new StreamsDatum(outDoc, entry.getId(), entry.getTimestamp(), entry.getSequenceid()); + outDatum.setMetadata(entry.getMetadata()); + result.add(outDatum); + } catch (Throwable ex) { + LOGGER.warn(ex.getMessage()); + LOGGER.warn(node.toString()); + } } - @Override - public void prepare(Object configurationObject) { - if( formats.size() > 0 ) - this.mapper = StreamsJacksonMapper.getInstance(formats); - else - this.mapper = StreamsJacksonMapper.getInstance(); + return result; + } + + @Override + public void prepare(Object configurationObject) { + if ( formats.size() > 0 ) { + this.mapper = StreamsJacksonMapper.getInstance(formats); + } else { + this.mapper = StreamsJacksonMapper.getInstance(); } + } - @Override - public void cleanUp() { + @Override + public void cleanUp() { - } -}; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java index 1316d5c..e0759c3 100644 --- a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java +++ b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java @@ -18,78 +18,80 @@ package org.apache.streams.jackson.test; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.jackson.TypeConverterProcessor; -import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; + import org.junit.Test; import java.io.IOException; import java.util.List; -import static junit.framework.Assert.*; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; /** * */ public class TypeConverterProcessorTest { - private static final String DATASIFT_JSON = "{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT @AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\ "confidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 14:28:06 +0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu ...\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"tit le\":[\"Ed Sheeran - SING [Official Video] - YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, pero mientras estemos aqui debemos BAILAR!!! #ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":709 31384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed, 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n en Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_ https\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon, 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}"; + private static final String DATASIFT_JSON = "{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT @AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\"c onfidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 14:28:06 +0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu.. .\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"title \":[\"Ed Sheeran - SING [Official Video] - YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, pero mientras estemos aqui debemos BAILAR!!! #ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":70931 384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed, 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n en Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_ht tps\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon, 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}"; - public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z"; + public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z"; - @Test - public void testTypeConverterStringToString() { - final String ID = "1"; - StreamsProcessor processor = new TypeConverterProcessor(String.class, String.class, Lists.newArrayList(DATASIFT_FORMAT)); - processor.prepare(null); - StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); - List<StreamsDatum> result = processor.process(datum); - assertNotNull(result); - assertEquals(1, result.size()); - StreamsDatum resultDatum = result.get(0); - assertNotNull(resultDatum); - assertNotNull(resultDatum.getDocument()); - assertTrue(resultDatum.getDocument() instanceof String); - assertEquals(ID, resultDatum.getId()); - } + @Test + public void testTypeConverterStringToString() { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(String.class, String.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof String); + assertEquals(ID, resultDatum.getId()); + } - @Test - public void testTypeConverterStringToObjectNode() { - final String ID = "1"; - StreamsProcessor processor = new TypeConverterProcessor(String.class, ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT)); - processor.prepare(null); - StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); - List<StreamsDatum> result = processor.process(datum); - assertNotNull(result); - assertEquals(1, result.size()); - StreamsDatum resultDatum = result.get(0); - assertNotNull(resultDatum); - assertNotNull(resultDatum.getDocument()); - assertTrue(resultDatum.getDocument() instanceof ObjectNode); - assertEquals(ID, resultDatum.getId()); - } + @Test + public void testTypeConverterStringToObjectNode() { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(String.class, ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof ObjectNode); + assertEquals(ID, resultDatum.getId()); + } - @Test - public void testTypeConverterObjectNodeToString() throws IOException { - final String ID = "1"; - StreamsProcessor processor = new TypeConverterProcessor(ObjectNode.class, String.class, Lists.newArrayList(DATASIFT_FORMAT)); - processor.prepare(null); - ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(DATASIFT_FORMAT)); - ObjectNode node = mapper.readValue(DATASIFT_JSON, ObjectNode.class); - StreamsDatum datum = new StreamsDatum(node, ID); - List<StreamsDatum> result = processor.process(datum); - assertNotNull(result); - assertEquals(1, result.size()); - StreamsDatum resultDatum = result.get(0); - assertNotNull(resultDatum); - assertNotNull(resultDatum.getDocument()); - assertTrue(resultDatum.getDocument() instanceof String); - assertEquals(ID, resultDatum.getId()); - } + @Test + public void testTypeConverterObjectNodeToString() throws IOException { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(ObjectNode.class, String.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(DATASIFT_FORMAT)); + ObjectNode node = mapper.readValue(DATASIFT_JSON, ObjectNode.class); + StreamsDatum datum = new StreamsDatum(node, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof String); + assertEquals(ID, resultDatum.getId()); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java index 24288f1..c2c3705 100644 --- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java +++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java @@ -18,18 +18,21 @@ package org.apache.streams.json; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; import com.google.common.collect.Lists; import com.jayway.jsonpath.JsonPath; + import net.minidev.json.JSONArray; import net.minidev.json.JSONObject; + import org.apache.commons.lang3.StringUtils; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,118 +41,118 @@ import java.util.List; /** * Provides a base implementation for extracting json fields and - * objects from datums using JsonPath syntax + * objects from datums using JsonPath syntax. */ public class JsonPathExtractor implements StreamsProcessor { - private final static String STREAMS_ID = "JsonPathExtractor"; + private static final String STREAMS_ID = "JsonPathExtractor"; - private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractor.class); - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - private String pathExpression; - private JsonPath jsonPath; + private String pathExpression; + private JsonPath jsonPath; - public JsonPathExtractor() { - LOGGER.info("creating JsonPathExtractor"); - } + public JsonPathExtractor() { + LOGGER.info("creating JsonPathExtractor"); + } - public JsonPathExtractor(String pathExpression) { - this.pathExpression = pathExpression; - LOGGER.info("creating JsonPathExtractor for " + this.pathExpression); - } + public JsonPathExtractor(String pathExpression) { + this.pathExpression = pathExpression; + LOGGER.info("creating JsonPathExtractor for " + this.pathExpression); + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { + @Override + public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newArrayList(); + List<StreamsDatum> result = Lists.newArrayList(); - String json = null; + String json = null; - LOGGER.debug("{} processing {}", STREAMS_ID); + LOGGER.debug("{} processing {}", STREAMS_ID); - if( entry.getDocument() instanceof ObjectNode ) { - ObjectNode node = (ObjectNode) entry.getDocument(); - try { - json = mapper.writeValueAsString(node); - } catch (JsonProcessingException e) { - LOGGER.warn(e.getMessage()); - } - } else if( entry.getDocument() instanceof String ) { - json = (String) entry.getDocument(); - } + if ( entry.getDocument() instanceof ObjectNode ) { + ObjectNode node = (ObjectNode) entry.getDocument(); + try { + json = mapper.writeValueAsString(node); + } catch (JsonProcessingException ex) { + LOGGER.warn(ex.getMessage()); + } + } else if ( entry.getDocument() instanceof String ) { + json = (String) entry.getDocument(); + } - if( StringUtils.isNotEmpty(json)) { - - try { - Object readResult = jsonPath.read(json); - - if (readResult instanceof String) { - String match = (String) readResult; - LOGGER.info("Matched String: " + match); - StreamsDatum matchDatum = new StreamsDatum(match); - result.add(matchDatum); - } else if (readResult instanceof JSONObject) { - JSONObject match = (JSONObject) readResult; - LOGGER.info("Matched Object: " + match); - ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class); - StreamsDatum matchDatum = new StreamsDatum(objectNode); - result.add(matchDatum); - } else if (readResult instanceof JSONArray) { - LOGGER.info("Matched Array:"); - JSONArray array = (JSONArray) readResult; - Iterator iterator = array.iterator(); - while (iterator.hasNext()) { - Object item = iterator.next(); - if( item instanceof String ) { - LOGGER.info("String Item:" + item); - String match = (String) item; - StreamsDatum matchDatum = new StreamsDatum(match); - result.add(matchDatum); - } else if ( item instanceof JSONObject ) { - LOGGER.info("Object Item:" + item); - JSONObject match = (JSONObject) item; - ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class); - StreamsDatum matchDatum = new StreamsDatum(objectNode); - result.add(matchDatum); - } else { - LOGGER.info("Other Item:" + item.toString()); - } - } - } else { - LOGGER.info("Other Match:" + readResult.toString()); - } - - } catch( Exception e ) { - LOGGER.warn(e.getMessage()); + if ( StringUtils.isNotEmpty(json)) { + + try { + Object readResult = jsonPath.read(json); + + if (readResult instanceof String) { + String match = (String) readResult; + LOGGER.info("Matched String: " + match); + StreamsDatum matchDatum = new StreamsDatum(match); + result.add(matchDatum); + } else if (readResult instanceof JSONObject) { + JSONObject match = (JSONObject) readResult; + LOGGER.info("Matched Object: " + match); + ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class); + StreamsDatum matchDatum = new StreamsDatum(objectNode); + result.add(matchDatum); + } else if (readResult instanceof JSONArray) { + LOGGER.info("Matched Array:"); + JSONArray array = (JSONArray) readResult; + Iterator iterator = array.iterator(); + while (iterator.hasNext()) { + Object item = iterator.next(); + if ( item instanceof String ) { + LOGGER.info("String Item:" + item); + String match = (String) item; + StreamsDatum matchDatum = new StreamsDatum(match); + result.add(matchDatum); + } else if ( item instanceof JSONObject ) { + LOGGER.info("Object Item:" + item); + JSONObject match = (JSONObject) item; + ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class); + StreamsDatum matchDatum = new StreamsDatum(objectNode); + result.add(matchDatum); + } else { + LOGGER.info("Other Item:" + item.toString()); } - + } } else { - LOGGER.warn("result empty"); + LOGGER.info("Other Match:" + readResult.toString()); } - return result; + } catch ( Exception ex ) { + LOGGER.warn(ex.getMessage()); + } + } else { + LOGGER.warn("result empty"); } - @Override - public void prepare(Object configurationObject) { - if( configurationObject instanceof String ) - jsonPath = JsonPath.compile((String)(configurationObject)); - else if( configurationObject instanceof String[] ) - jsonPath = JsonPath.compile(((String[])(configurationObject))[0]); + return result; - mapper.registerModule(new JsonOrgModule()); - } + } - @Override - public void cleanUp() { - LOGGER.info("shutting down JsonPathExtractor for " + this.pathExpression); + @Override + public void prepare(Object configurationObject) { + if ( configurationObject instanceof String ) { + jsonPath = JsonPath.compile((String) (configurationObject)); + } else if ( configurationObject instanceof String[] ) { + jsonPath = JsonPath.compile(((String[]) (configurationObject))[0]); } -}; + mapper.registerModule(new JsonOrgModule()); + } + + @Override + public void cleanUp() { + LOGGER.info("shutting down JsonPathExtractor for " + this.pathExpression); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java index fcf34d7..ec741c2 100644 --- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java +++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java @@ -19,6 +19,10 @@ package org.apache.streams.json; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -28,12 +32,11 @@ import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.jayway.jsonpath.JsonPath; + import net.minidev.json.JSONArray; import net.minidev.json.JSONObject; + import org.apache.commons.lang3.StringUtils; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,134 +46,133 @@ import java.util.Map; /** * Provides a base implementation for filtering datums which - * do not contain specific fields using JsonPath syntax + * do not contain specific fields using JsonPath syntax. */ public class JsonPathFilter implements StreamsProcessor { - private final static String STREAMS_ID = "JsonPathFilter"; + private static final String STREAMS_ID = "JsonPathFilter"; - private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathFilter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(JsonPathFilter.class); - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - private String pathExpression; - private JsonPath jsonPath; - private String destNodeName; - - public JsonPathFilter() { - LOGGER.info("creating JsonPathFilter"); - } + private String pathExpression; + private JsonPath jsonPath; + private String destNodeName; - public JsonPathFilter(String pathExpression) { - this.pathExpression = pathExpression; - LOGGER.info("creating JsonPathFilter for " + this.pathExpression); - } + public JsonPathFilter() { + LOGGER.info("creating JsonPathFilter"); + } - @Override - public String getId() { - return STREAMS_ID; - } + public JsonPathFilter(String pathExpression) { + this.pathExpression = pathExpression; + LOGGER.info("creating JsonPathFilter for " + this.pathExpression); + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { + @Override + public String getId() { + return STREAMS_ID; + } - List<StreamsDatum> result = Lists.newArrayList(); + @Override + public List<StreamsDatum> process(StreamsDatum entry) { - String json = null; + List<StreamsDatum> result = Lists.newArrayList(); - ObjectNode document = null; + String json = null; - LOGGER.debug("{} processing {}", STREAMS_ID); + ObjectNode document = null; - if( entry.getDocument() instanceof ObjectNode ) { - document = (ObjectNode) entry.getDocument(); - try { - json = mapper.writeValueAsString(document); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - } else if( entry.getDocument() instanceof String ) { - json = (String) entry.getDocument(); - try { - document = mapper.readValue(json, ObjectNode.class); - } catch (IOException e) { - e.printStackTrace(); - return null; - } - } + LOGGER.debug("{} processing {}", STREAMS_ID); - Preconditions.checkNotNull(document); - - if( StringUtils.isNotEmpty(json)) { - - Object srcResult = null; - try { - srcResult = jsonPath.read(json); - - } catch( Exception e ) { - e.printStackTrace(); - LOGGER.warn(e.getMessage()); - } - - Preconditions.checkNotNull(srcResult); - - String[] path = StringUtils.split(pathExpression, '.'); - ObjectNode node = document; - for (int i = 1; i < path.length-1; i++) { - node = (ObjectNode) document.get(path[i]); - } - - Preconditions.checkNotNull(node); - - if( srcResult instanceof JSONArray ) { - try { - ArrayNode jsonNode = mapper.convertValue(srcResult, ArrayNode.class); - if( jsonNode.size() == 1 ) { - JsonNode item = jsonNode.get(0); - node.set(destNodeName, item); - } else { - node.set(destNodeName, jsonNode); - } - } catch (Exception e) { - LOGGER.warn(e.getMessage()); - } - } else if( srcResult instanceof JSONObject ) { - try { - ObjectNode jsonNode = mapper.convertValue(srcResult, ObjectNode.class); - node.set(destNodeName, jsonNode); - } catch (Exception e) { - LOGGER.warn(e.getMessage()); - } - } else if( srcResult instanceof String ) { - try { - node.put(destNodeName, (String) srcResult); - } catch (Exception e) { - LOGGER.warn(e.getMessage()); - } - } + if ( entry.getDocument() instanceof ObjectNode ) { + document = (ObjectNode) entry.getDocument(); + try { + json = mapper.writeValueAsString(document); + } catch (JsonProcessingException ex) { + ex.printStackTrace(); + } + } else if ( entry.getDocument() instanceof String ) { + json = (String) entry.getDocument(); + try { + document = mapper.readValue(json, ObjectNode.class); + } catch (IOException ex) { + ex.printStackTrace(); + return null; + } + } + Preconditions.checkNotNull(document); + + if ( StringUtils.isNotEmpty(json)) { + + Object srcResult = null; + try { + srcResult = jsonPath.read(json); + } catch ( Exception ex ) { + ex.printStackTrace(); + LOGGER.warn(ex.getMessage()); + } + + Preconditions.checkNotNull(srcResult); + + String[] path = StringUtils.split(pathExpression, '.'); + ObjectNode node = document; + for (int i = 1; i < path.length - 1; i++) { + node = (ObjectNode) document.get(path[i]); + } + + Preconditions.checkNotNull(node); + + if ( srcResult instanceof JSONArray ) { + try { + ArrayNode jsonNode = mapper.convertValue(srcResult, ArrayNode.class); + if ( jsonNode.size() == 1 ) { + JsonNode item = jsonNode.get(0); + node.set(destNodeName, item); + } else { + node.set(destNodeName, jsonNode); + } + } catch (Exception ex) { + LOGGER.warn(ex.getMessage()); + } + } else if ( srcResult instanceof JSONObject ) { + try { + ObjectNode jsonNode = mapper.convertValue(srcResult, ObjectNode.class); + node.set(destNodeName, jsonNode); + } catch (Exception ex) { + LOGGER.warn(ex.getMessage()); + } + } else if ( srcResult instanceof String ) { + try { + node.put(destNodeName, (String) srcResult); + } catch (Exception ex) { + LOGGER.warn(ex.getMessage()); } + } - result.add(new StreamsDatum(document)); + } - return result; + result.add(new StreamsDatum(document)); - } + return result; - @Override - public void prepare(Object configurationObject) { - if( configurationObject instanceof Map) { - Map<String,String> params = ( Map<String,String>) configurationObject; - pathExpression = params.get("pathExpression"); - jsonPath = JsonPath.compile(pathExpression); - destNodeName = pathExpression.substring(pathExpression.lastIndexOf(".") + 1); - } + } - mapper.registerModule(new JsonOrgModule()); + @Override + public void prepare(Object configurationObject) { + if ( configurationObject instanceof Map) { + Map<String,String> params = ( Map<String,String>) configurationObject; + pathExpression = params.get("pathExpression"); + jsonPath = JsonPath.compile(pathExpression); + destNodeName = pathExpression.substring(pathExpression.lastIndexOf(".") + 1); } - @Override - public void cleanUp() { - LOGGER.info("shutting down JsonPathFilter for " + this.pathExpression); - } -}; + mapper.registerModule(new JsonOrgModule()); + } + + @Override + public void cleanUp() { + LOGGER.info("shutting down JsonPathFilter for " + this.pathExpression); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java index 2ab3b7f..1ab7c00 100644 --- a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java +++ b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java @@ -19,83 +19,85 @@ package org.apache.streams.json.test; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.io.FileUtils; import org.apache.streams.core.StreamsDatum; import org.apache.streams.json.JsonPathExtractor; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.File; +import java.io.IOException; import java.util.List; -import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; /** * Test for extracting json fields and - * objects from datums using JsonPath syntax + * objects from datums using JsonPath syntax. */ public class JsonPathExtractorTest { - private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractorTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractorTest.class); - private String testJson; + private String testJson; - @Before - public void initialize() { - try { - testJson = FileUtils.readFileToString(new File("src/test/resources/books.json")); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(); - } + @Before + public void initialize() { + try { + testJson = FileUtils.readFileToString(new File("src/test/resources/books.json")); + } catch (IOException ex) { + ex.printStackTrace(); + Assert.fail(); } + } - @Test - public void test1() - { - JsonPathExtractor extractor = new JsonPathExtractor(); - extractor.prepare("$.store.book[*].author"); - List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson)); - assertThat(result.size(), is(2)); - assertTrue(result.get(0).getDocument() instanceof String); - assertTrue(result.get(1).getDocument() instanceof String); - } + @Test + public void test1() + { + JsonPathExtractor extractor = new JsonPathExtractor(); + extractor.prepare("$.store.book[*].author"); + List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson)); + assertThat(result.size(), is(2)); + assertTrue(result.get(0).getDocument() instanceof String); + assertTrue(result.get(1).getDocument() instanceof String); + } - @Test - public void test2() - { - JsonPathExtractor extractor = new JsonPathExtractor(); - extractor.prepare("$.store.book[?(@.category == 'reference')]"); - List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson)); - assertThat(result.size(), is(1)); - assertTrue(result.get(0).getDocument() instanceof ObjectNode); - } + @Test + public void test2() + { + JsonPathExtractor extractor = new JsonPathExtractor(); + extractor.prepare("$.store.book[?(@.category == 'reference')]"); + List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson)); + assertThat(result.size(), is(1)); + assertTrue(result.get(0).getDocument() instanceof ObjectNode); + } - @Test - public void test3() - { - JsonPathExtractor extractor = new JsonPathExtractor(); - extractor.prepare("$.store.book[?(@.price > 10)]"); - List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson)); - assertThat(result.size(), is(1)); - assertTrue(result.get(0).getDocument() instanceof ObjectNode); - } + @Test + public void test3() + { + JsonPathExtractor extractor = new JsonPathExtractor(); + extractor.prepare("$.store.book[?(@.price > 10)]"); + List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson)); + assertThat(result.size(), is(1)); + assertTrue(result.get(0).getDocument() instanceof ObjectNode); + } - @Test - public void test4() - { - JsonPathExtractor extractor = new JsonPathExtractor(); - extractor.prepare("$.store.book[?(@.isbn)]"); - List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson)); - assertThat(result.size(), is(1)); - assertTrue(result.get(0).getDocument() instanceof ObjectNode); - } + @Test + public void test4() + { + JsonPathExtractor extractor = new JsonPathExtractor(); + extractor.prepare("$.store.book[?(@.isbn)]"); + List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson)); + assertThat(result.size(), is(1)); + assertTrue(result.get(0).getDocument() instanceof ObjectNode); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java index 5b23e16..04a680d 100644 --- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java +++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java @@ -26,6 +26,7 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,42 +34,49 @@ import java.util.HashMap; import java.util.Map; /** - * Enrich actor with account type + * Enrich actor with account type. */ public class AccountTypeProcessor extends SimpleHTTPGetProcessor { - private final static String STREAMS_ID = "AccountTypeProcessor"; + private static final String STREAMS_ID = "AccountTypeProcessor"; - private final static Logger LOGGER = LoggerFactory.getLogger(AccountTypeProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AccountTypeProcessor.class); - public AccountTypeProcessor() { - this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern"))); - } + /** + * AccountTypeProcessor constructor - resolves HttpProcessorConfiguration from JVM 'peoplepattern'. + */ + public AccountTypeProcessor() { + this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern"))); + } - public AccountTypeProcessor(HttpProcessorConfiguration peoplePatternConfiguration) { - super(peoplePatternConfiguration); - LOGGER.info("creating AccountTypeProcessor"); - configuration.setProtocol("https"); - configuration.setHostname("api.peoplepattern.com"); - configuration.setResourcePath("/v0.2/account_type/"); - configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR); - configuration.setExtension("account_type"); - } + /** + * AccountTypeProcessor constructor - uses supplied HttpProcessorConfiguration. + * @param peoplePatternConfiguration peoplePatternConfiguration + */ + public AccountTypeProcessor(HttpProcessorConfiguration peoplePatternConfiguration) { + super(peoplePatternConfiguration); + LOGGER.info("creating AccountTypeProcessor"); + configuration.setProtocol("https"); + configuration.setHostname("api.peoplepattern.com"); + configuration.setResourcePath("/v0.2/account_type/"); + configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR); + configuration.setExtension("account_type"); + } - /** - Override this to add parameters to the request - */ - @Override - protected Map<String, String> prepareParams(StreamsDatum entry) { - Activity activity = mapper.convertValue(entry.getDocument(), Activity.class); - ActivityObject actor = mapper.convertValue(activity.getActor(), ActivityObject.class); - String username = (String) ExtensionUtil.getInstance().getExtension(actor, "screenName"); - Map<String, String> params = new HashMap<>(); - params.put("id", actor.getId()); - params.put("name", actor.getDisplayName()); - params.put("username", username); - params.put("description", actor.getSummary()); - return params; - } + /** + Override this to add parameters to the request. + */ + @Override + protected Map<String, String> prepareParams(StreamsDatum entry) { + Activity activity = mapper.convertValue(entry.getDocument(), Activity.class); + ActivityObject actor = mapper.convertValue(activity.getActor(), ActivityObject.class); + String username = (String) ExtensionUtil.getInstance().getExtension(actor, "screenName"); + Map<String, String> params = new HashMap<>(); + params.put("id", actor.getId()); + params.put("name", actor.getDisplayName()); + params.put("username", username); + params.put("description", actor.getSummary()); + return params; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java index 1ee55a9..e615026 100644 --- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java +++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java @@ -26,6 +26,7 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,43 +34,50 @@ import java.util.HashMap; import java.util.Map; /** - * Enrich actor with demographics + * Enrich actor with demographics. */ public class DemographicsProcessor extends SimpleHTTPGetProcessor { - public final static String STREAMS_ID = "DemographicsProcessor"; + public static final String STREAMS_ID = "DemographicsProcessor"; - private final static Logger LOGGER = LoggerFactory.getLogger(DemographicsProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DemographicsProcessor.class); - public DemographicsProcessor() { - this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern"))); - } + /** + * DemographicsProcessor constructor - resolves HttpProcessorConfiguration from JVM 'peoplepattern'. + */ + public DemographicsProcessor() { + this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern"))); + } - public DemographicsProcessor(HttpProcessorConfiguration peoplePatternConfiguration) { - super(peoplePatternConfiguration); - LOGGER.info("creating DemographicsProcessor"); - configuration.setProtocol("https"); - configuration.setHostname("api.peoplepattern.com"); - configuration.setResourcePath("/v0.2/demographics/"); - configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR); - configuration.setExtension("demographics"); - } + /** + * AccountTypeProcessor constructor - uses supplied HttpProcessorConfiguration. + * @param peoplePatternConfiguration peoplePatternConfiguration + */ + public DemographicsProcessor(HttpProcessorConfiguration peoplePatternConfiguration) { + super(peoplePatternConfiguration); + LOGGER.info("creating DemographicsProcessor"); + configuration.setProtocol("https"); + configuration.setHostname("api.peoplepattern.com"); + configuration.setResourcePath("/v0.2/demographics/"); + configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR); + configuration.setExtension("demographics"); + } - /** - Override this to add parameters to the request - */ - @Override - protected Map<String, String> prepareParams(StreamsDatum entry) { - Activity activity = mapper.convertValue(entry.getDocument(), Activity.class); - ActivityObject actor = mapper.convertValue(activity.getActor(), ActivityObject.class); - String username = (String) ExtensionUtil.getInstance().getExtension(actor, "screenName"); - Map<String, String> params = new HashMap<>(); - params.put("id", actor.getId()); - params.put("name", actor.getDisplayName()); - params.put("username", username); - params.put("description", actor.getSummary()); - return params; - } + /** + Override this to add parameters to the request. + */ + @Override + protected Map<String, String> prepareParams(StreamsDatum entry) { + Activity activity = mapper.convertValue(entry.getDocument(), Activity.class); + ActivityObject actor = mapper.convertValue(activity.getActor(), ActivityObject.class); + String username = (String) ExtensionUtil.getInstance().getExtension(actor, "screenName"); + Map<String, String> params = new HashMap<>(); + params.put("id", actor.getId()); + params.put("name", actor.getDisplayName()); + params.put("username", username); + params.put("description", actor.getSummary()); + return params; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java index 0f46ccd..206931f 100644 --- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java +++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java @@ -19,16 +19,18 @@ package org.apache.streams.regex; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,91 +46,93 @@ import java.util.Set; * modifying the appropriate {@link org.apache.streams.pojo.json.Activity} extensions object. */ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProcessor { - private final String patternConfigKey; - private final String extensionKey; - private final String defaultPattern; - private final static Logger LOGGER = LoggerFactory.getLogger(AbstractRegexExtensionExtractor.class); + private final String patternConfigKey; + private final String extensionKey; + private final String defaultPattern; - private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegexExtensionExtractor.class); - private String pattern; + private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - protected AbstractRegexExtensionExtractor(String patternConfigKey, String extensionKey, String defaultPattern) { - this.patternConfigKey = patternConfigKey; - this.extensionKey = extensionKey; - this.defaultPattern = defaultPattern; - } + private String pattern; - public String getPattern() { - return pattern; - } + protected AbstractRegexExtensionExtractor(String patternConfigKey, String extensionKey, String defaultPattern) { + this.patternConfigKey = patternConfigKey; + this.extensionKey = extensionKey; + this.defaultPattern = defaultPattern; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - Activity activity; - if (entry.getDocument() instanceof Activity) { - activity = (Activity) entry.getDocument(); - } else if (entry.getDocument() instanceof ObjectNode) { - activity = mapper.convertValue(entry.getDocument(), Activity.class); - } else { - return new ArrayList<>(); - } - if (Strings.isNullOrEmpty(pattern)) { - prepare(null); - } - Map<String, List<Integer>> matches = RegexUtils.extractMatches(pattern, activity.getContent()); - Collection<T> entities = ensureTargetObject(activity); - for (String key : matches.keySet()) { - entities.add(prepareObject(key)); - } - - Set<T> set = new HashSet<>(); - set.addAll(entities); - entities.clear(); - entities.addAll(set); - - entry.setDocument(activity); - return Lists.newArrayList(entry); - } + public String getPattern() { + return pattern; + } - @Override - public void prepare(Object configurationObject) { - if (configurationObject instanceof Map) { - if (((Map) configurationObject).containsKey(patternConfigKey)) { - pattern = (String) ((Map) configurationObject).get(patternConfigKey); - } - } else if (configurationObject instanceof String) { - pattern = (String) configurationObject; - } else { - pattern = defaultPattern; - } + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + Activity activity; + if (entry.getDocument() instanceof Activity) { + activity = (Activity) entry.getDocument(); + } else if (entry.getDocument() instanceof ObjectNode) { + activity = mapper.convertValue(entry.getDocument(), Activity.class); + } else { + return new ArrayList<>(); } - - @Override - public void cleanUp() { - //NOP + if (Strings.isNullOrEmpty(pattern)) { + prepare(null); + } + Map<String, List<Integer>> matches = RegexUtils.extractMatches(pattern, activity.getContent()); + Collection<T> entities = ensureTargetObject(activity); + for (String key : matches.keySet()) { + entities.add(prepareObject(key)); } - /** - * Configures the value to be persisted to the extensions object - * @param extracted the value extracted by the regex - * @return an object representing the appropriate extension - */ - protected abstract T prepareObject(String extracted); - - @SuppressWarnings("unchecked") - protected Collection<T> ensureTargetObject(Activity activity) { - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - Set<T> hashtags; - if(extensions.containsKey(extensionKey) && extensions.get(extensionKey) != null) { - hashtags = Sets.newHashSet((Iterable<T>) extensions.get(extensionKey)); - } else { - hashtags = new HashSet<>(); - } - - extensions.put(extensionKey, hashtags); - - return hashtags; + Set<T> set = new HashSet<>(); + set.addAll(entities); + entities.clear(); + entities.addAll(set); + + entry.setDocument(activity); + return Lists.newArrayList(entry); + } + + @Override + public void prepare(Object configurationObject) { + if (configurationObject instanceof Map) { + if (((Map) configurationObject).containsKey(patternConfigKey)) { + pattern = (String) ((Map) configurationObject).get(patternConfigKey); + } + } else if (configurationObject instanceof String) { + pattern = (String) configurationObject; + } else { + pattern = defaultPattern; + } + } + + @Override + public void cleanUp() { + //NOP + } + + /** + * Configures the value to be persisted to the extensions object. + * @param extracted the value extracted by the regex + * @return an object representing the appropriate extension + */ + protected abstract T prepareObject(String extracted); + + @SuppressWarnings("unchecked") + protected Collection<T> ensureTargetObject(Activity activity) { + Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + Set<T> hashtags; + + if (extensions.containsKey(extensionKey) && extensions.get(extensionKey) != null) { + hashtags = Sets.newHashSet((Iterable<T>) extensions.get(extensionKey)); + } else { + hashtags = new HashSet<>(); } + + extensions.put(extensionKey, hashtags); + + return hashtags; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexHashtagExtractor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexHashtagExtractor.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexHashtagExtractor.java index a59193b..326d5fa 100644 --- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexHashtagExtractor.java +++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexHashtagExtractor.java @@ -20,6 +20,7 @@ package org.apache.streams.regex; import org.apache.streams.core.StreamsProcessor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,27 +28,27 @@ import org.slf4j.LoggerFactory; * Processes the content of an {@link org.apache.streams.pojo.json.Activity} object to extract the Hashtags and add * them to the appropriate extensions object */ -public class RegexHashtagExtractor extends AbstractRegexExtensionExtractor<String> implements StreamsProcessor{ +public class RegexHashtagExtractor extends AbstractRegexExtensionExtractor<String> implements StreamsProcessor { - private final static String STREAMS_ID = "RegexHashtagExtractor"; + private static final String STREAMS_ID = "RegexHashtagExtractor"; - private final static Logger LOGGER = LoggerFactory.getLogger(RegexHashtagExtractor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RegexHashtagExtractor.class); - public final static String DEFAULT_PATTERN = "#\\w+"; - public final static String PATTERN_CONFIG_KEY = "HashtagPattern"; - public final static String EXTENSION_KEY = "hashtags"; + public static final String DEFAULT_PATTERN = "#\\w+"; + public static final String PATTERN_CONFIG_KEY = "HashtagPattern"; + public static final String EXTENSION_KEY = "hashtags"; - public RegexHashtagExtractor() { - super(PATTERN_CONFIG_KEY, EXTENSION_KEY, DEFAULT_PATTERN); - } + public RegexHashtagExtractor() { + super(PATTERN_CONFIG_KEY, EXTENSION_KEY, DEFAULT_PATTERN); + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - protected String prepareObject(String extracted) { - return extracted.substring(1); - } + @Override + protected String prepareObject(String extracted) { + return extracted.substring(1); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexMentionsExtractor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexMentionsExtractor.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexMentionsExtractor.java index eabdb04..ddd3ed5 100644 --- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexMentionsExtractor.java +++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexMentionsExtractor.java @@ -18,9 +18,11 @@ */ package org.apache.streams.regex; -import com.google.common.collect.Maps; + import org.apache.streams.core.StreamsProcessor; +import com.google.common.collect.Maps; + import java.util.HashMap; import java.util.Map; @@ -30,27 +32,27 @@ import java.util.Map; */ public class RegexMentionsExtractor extends AbstractRegexExtensionExtractor<Map<String, Object>> implements StreamsProcessor { - private final static String STREAMS_ID = "RegexMentionsExtractor"; + private static final String STREAMS_ID = "RegexMentionsExtractor"; - public static final String DEFAULT_PATTERN = "@\\w+"; - public static final String PATTERN_CONFIG_KEY = "MentionPattern"; - public static final String EXTENSION_KEY = "user_mentions"; - public static final String DISPLAY_KEY = "displayName"; + public static final String DEFAULT_PATTERN = "@\\w+"; + public static final String PATTERN_CONFIG_KEY = "MentionPattern"; + public static final String EXTENSION_KEY = "user_mentions"; + public static final String DISPLAY_KEY = "displayName"; - public RegexMentionsExtractor() { - super(PATTERN_CONFIG_KEY, EXTENSION_KEY, DEFAULT_PATTERN); - } + public RegexMentionsExtractor() { + super(PATTERN_CONFIG_KEY, EXTENSION_KEY, DEFAULT_PATTERN); + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - protected Map<String, Object> prepareObject(String extracted) { - HashMap<String, Object> mention = Maps.newHashMap(); - mention.put(DISPLAY_KEY, extracted.substring(1)); - return mention; - } + @Override + protected Map<String, Object> prepareObject(String extracted) { + HashMap<String, Object> mention = Maps.newHashMap(); + mention.put(DISPLAY_KEY, extracted.substring(1)); + return mention; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUrlExtractor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUrlExtractor.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUrlExtractor.java index 84d3257..ea8474d 100644 --- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUrlExtractor.java +++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUrlExtractor.java @@ -30,46 +30,46 @@ import java.util.Collection; */ public class RegexUrlExtractor extends AbstractRegexExtensionExtractor<String> implements StreamsProcessor { - private final static String STREAMS_ID = "RegexUrlExtractor"; + private static final String STREAMS_ID = "RegexUrlExtractor"; - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - //Temporarily copied from streams-processor-urls so as not to force a dependency on that provider. This should - //be moved to a common utility package - public final static String DEFAULT_PATTERN = - "(?:(?:https?|ftp)://)" + - "(?:\\S+(?::\\S*)?@)?" + - "(?:" + - "(?!(?:10|127)(?:\\.\\d{1,3}){3})" + - "(?!(?:169\\.254|192\\.168)(?:\\.\\d{1,3}){2})" + - "(?!172\\.(?:1[6-9]|2\\d|3[0-1])(?:\\.\\d{1,3}){2})" + - "(?:[1-9]\\d?|1\\d\\d|2[01]\\d|22[0-3])" + - "(?:\\.(?:1?\\d{1,2}|2[0-4]\\d|25[0-5])){2}" + - "(?:\\.(?:[1-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4]))" + - "|" + - "(?:(?:[a-z\\u00a1-\\uffff0-9]+-?)*[a-z\\u00a1-\\uffff0-9]+)" + - "(?:\\.(?:[a-z\\u00a1-\\uffff0-9]+-?)*[a-z\\u00a1-\\uffff0-9]+)*" + - "(?:\\.(?:[a-z\\u00a1-\\uffff]{2,}))" + - ")" + - "(?::\\d{2,5})?" + - "(?:/[^\\s]*)?"; + //Temporarily copied from streams-processor-urls so as not to force a dependency on that provider. This should + //be moved to a common utility package + public static final String DEFAULT_PATTERN = + "(?:(?:https?|ftp)://)" + + "(?:\\S+(?::\\S*)?@)?" + + "(?:" + + "(?!(?:10|127)(?:\\.\\d{1,3}){3})" + + "(?!(?:169\\.254|192\\.168)(?:\\.\\d{1,3}){2})" + + "(?!172\\.(?:1[6-9]|2\\d|3[0-1])(?:\\.\\d{1,3}){2})" + + "(?:[1-9]\\d?|1\\d\\d|2[01]\\d|22[0-3])" + + "(?:\\.(?:1?\\d{1,2}|2[0-4]\\d|25[0-5])){2}" + + "(?:\\.(?:[1-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4]))" + + "|" + + "(?:(?:[a-z\\u00a1-\\uffff0-9]+-?)*[a-z\\u00a1-\\uffff0-9]+)" + + "(?:\\.(?:[a-z\\u00a1-\\uffff0-9]+-?)*[a-z\\u00a1-\\uffff0-9]+)*" + + "(?:\\.(?:[a-z\\u00a1-\\uffff]{2,}))" + + ")" + + "(?::\\d{2,5})?" + + "(?:/[^\\s]*)?"; - public final static String PATTERN_CONFIG_KEY = "URLPattern"; + public static final String PATTERN_CONFIG_KEY = "URLPattern"; - public RegexUrlExtractor() { - super(PATTERN_CONFIG_KEY, null, DEFAULT_PATTERN); - } + public RegexUrlExtractor() { + super(PATTERN_CONFIG_KEY, null, DEFAULT_PATTERN); + } - @Override - protected String prepareObject(String extracted) { - return extracted; - } + @Override + protected String prepareObject(String extracted) { + return extracted; + } - @Override - protected Collection<String> ensureTargetObject(Activity activity) { - return activity.getLinks(); - } + @Override + protected Collection<String> ensureTargetObject(Activity activity) { + return activity.getLinks(); + } }
