hdfs-pullarticles working
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cc9bc046 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cc9bc046 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cc9bc046 Branch: refs/heads/master Commit: cc9bc04683dcfb7d6150223f541e4e82fc40588c Parents: 0c31066 Author: sblackmon <[email protected]> Authored: Thu Apr 3 02:38:47 2014 -0500 Committer: sblackmon <[email protected]> Committed: Thu Apr 3 02:38:47 2014 -0500 ---------------------------------------------------------------------- streams-contrib/pom.xml | 1 + .../org/apache/streams/tika/LinkExpander.java | 12 +--- .../org/apache/streams/tika/TikaProcessor.java | 69 +++++++++++++------- .../apache/streams/tika/BoilerPipeArticle.json | 7 -- 4 files changed, 48 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index 2d2d27c..a796dad 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -44,6 +44,7 @@ <module>streams-persist-hdfs</module> <module>streams-persist-kafka</module> <module>streams-persist-mongo</module> + <!--<module>streams-processor-lucene</module>--> <module>streams-processor-tika</module> <module>streams-processor-urls</module> <module>streams-provider-datasift</module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java index fe0e898..e4c0cef 100644 --- a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java +++ b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java @@ -50,14 +50,6 @@ public class LinkExpander extends LinkUnwinder private BoilerPipeArticle article = new BoilerPipeArticle(); - // sblackmon: I put this here so I wouldn't get NullPointerExceptions when serializing results - public TextBlock getContentTextBlock() { - for(TextBlock textBlock : article.getTextBlocks()) - if(textBlock.isContent()) - return textBlock; - return null; - } - private static final Collection<String> AUTHOR_SEARCH = new ArrayList<String>() {{ add("og:author"); add("dc:author"); @@ -133,6 +125,9 @@ public class LinkExpander extends LinkUnwinder expandLink(); } + public BoilerPipeArticle getArticle() { + return article; + } private void expandLink() { @@ -186,7 +181,6 @@ public class LinkExpander extends LinkUnwinder boilerpipeContentHandler, rawMetaData); - article.setTextBlocks(boilerpipeContentHandler.getTextDocument().getTextBlocks()); article.setBody(boilerpipeContentHandler.getTextDocument().getContent()); article.setTitle(boilerpipeContentHandler.getTextDocument().getTitle()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java index b2f337d..7609635 100644 --- a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java +++ b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java @@ -1,5 +1,6 @@ package org.apache.streams.tika; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; import com.google.common.collect.Lists; @@ -39,38 +40,46 @@ public class TikaProcessor implements StreamsProcessor LOGGER.debug("{} processing {}", STREAMS_ID, entry.getDocument().getClass()); + Activity activity; + + System.out.println( STREAMS_ID + " processing " + entry.getDocument().getClass()); // get list of shared urls if( entry.getDocument() instanceof Activity) { - Activity input = (Activity) entry.getDocument(); - - List<String> outputLinks = input.getLinks(); - // for each - for( String link : outputLinks ) { - if( link instanceof String ) { - // expand - try { - StreamsDatum outputDatum = expandLink((String) link, entry); - result.add(outputDatum); - } catch (Exception e) { - //drop unexpandable links - LOGGER.debug("Failed to expand link : {}", link); - LOGGER.debug("Excpetion expanding link : {}", e); - } - } - else { - LOGGER.warn("Expected Links to be of type java.lang.String, but received {}", link.getClass().toString()); - } - } - + activity = (Activity) entry.getDocument(); } else if(entry.getDocument() instanceof String) { - StreamsDatum outputDatum = expandLink((String) entry.getDocument(), entry); - result.add(outputDatum); + + try { + activity = mapper.readValue((String) entry.getDocument(), Activity.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn(e.getMessage()); + return(Lists.newArrayList(entry)); + } + } else throw new NotImplementedException(); + List<String> outputLinks = activity.getLinks(); + // for each + for( String link : outputLinks ) { + + System.out.println( "pulling " + link); + + try { + StreamsDatum outputDatum = expandLink(link, entry); + if( outputDatum != null ) + result.add(outputDatum); + } catch (Exception e) { + //drop unexpandable links + LOGGER.debug("Failed to expand link : {}", link); + LOGGER.debug("Excpetion expanding link : {}", e); + } + + } + return result; } @@ -80,9 +89,19 @@ public class TikaProcessor implements StreamsProcessor expander.run(); StreamsDatum datum = null; if(input.getId() == null) - datum = new StreamsDatum(this.mapper.convertValue(expander, JSONObject.class).toString(), expander.getFinalURL()); + try { + datum = new StreamsDatum(this.mapper.writeValueAsString(expander.getArticle()), expander.getFinalURL()); + } catch (JsonProcessingException e) { + e.printStackTrace(); + return null; + } else - datum = new StreamsDatum(this.mapper.convertValue(expander, JSONObject.class).toString(), input.getId()); + try { + datum = new StreamsDatum(this.mapper.writeValueAsString(expander.getArticle()), input.getId()); + } catch (JsonProcessingException e) { + e.printStackTrace(); + return null; + } datum.setSequenceid(input.getSequenceid()); datum.setMetadata(input.getMetadata()); datum.setTimestamp(input.getTimestamp()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json b/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json index a23b13e..137a4be 100644 --- a/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json +++ b/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json @@ -50,13 +50,6 @@ } } }, - "textBlocks": { - "type": "array", - "items": { - "javaType": "de.l3s.boilerpipe.document.TextBlock", - "type": "object" - } - }, "keywords": { "type": "array", "uniqueItems": true,
