Repository: incubator-streams Updated Branches: refs/heads/STREAMS-26 d764d7c96 -> 34990168b
found and fixed failure condition in moreoverclient made HDFS id (column 1) the default ES id Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/34990168 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/34990168 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/34990168 Branch: refs/heads/STREAMS-26 Commit: 34990168bf9d01540d9100162ef5d04a48d35a32 Parents: d764d7c Author: sblackmon <[email protected]> Authored: Mon Mar 17 14:51:40 2014 -0500 Committer: sblackmon <[email protected]> Committed: Mon Mar 17 14:51:40 2014 -0500 ---------------------------------------------------------------------- .../ElasticsearchPersistWriter.java | 6 ++-- .../streams/hdfs/WebHdfsPersistReader.java | 1 + .../streams/hdfs/WebHdfsPersistReaderTask.java | 10 ++++-- .../streams/data/moreover/MoreoverClient.java | 10 +++++- .../streams/data/moreover/MoreoverResult.java | 35 +++++++++++++++----- .../org/apache/streams/core/StreamsDatum.java | 3 +- 6 files changed, 49 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index c4fd9f0..595011b 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -133,9 +133,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab add(config.getIndex(), config.getType(), id, json); - } catch (JsonProcessingException e) { - LOGGER.warn("{} {}", e.getLocation(), e.getMessage()); - + } catch (Exception e) { + LOGGER.warn("{} {}", e.getMessage()); + e.printStackTrace(); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index cf4c146..659c517 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -2,6 +2,7 @@ package org.apache.streams.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; +import com.google.common.base.Strings; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Queues; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java index e8c1695..6cd1e79 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java @@ -1,13 +1,16 @@ package org.apache.streams.hdfs; +import com.google.common.base.Strings; import org.apache.hadoop.fs.FileStatus; import org.apache.streams.core.StreamsDatum; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.util.Calendar; import java.util.Random; public class WebHdfsPersistReaderTask implements Runnable { @@ -34,9 +37,12 @@ public class WebHdfsPersistReaderTask implements Runnable { do{ try { line = bufferedReader.readLine(); - if( line != null ) { + if( !Strings.isNullOrEmpty(line) ) { String[] fields = line.split(Character.toString(reader.DELIMITER)); - reader.persistQueue.offer(new StreamsDatum(fields[3])); + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(new Long(fields[2])); + StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(cal.getTime())); + reader.persistQueue.offer(entry); } } catch (Exception e) { LOGGER.warn("Failed processing " + line); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java index a43c4d8..b5888c3 100644 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java @@ -40,9 +40,13 @@ public class MoreoverClient { logger.debug("Making call to {}", urlString); long start = System.nanoTime(); MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime()); - logger.debug("Maximum sequence from last call {}", result.getMaxSequencedId()); if(!result.getMaxSequencedId().equals(BigInteger.ZERO)) + { this.lastSequenceId = result.getMaxSequencedId(); + logger.debug("Maximum sequence from last call {}", this.lastSequenceId); + } + else + logger.debug("No maximum sequence returned in last call {}", this.lastSequenceId); return result; } @@ -79,6 +83,10 @@ public class MoreoverClient { IOUtils.copy(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")), writer); writer.flush(); pullTime = new Date().getTime(); + + // added after seeing java.net.SocketException: Too many open files + cn.disconnect(); + return writer.toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java index 50e3f66..0ef49c5 100644 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java @@ -3,6 +3,7 @@ package org.apache.streams.data.moreover; import com.fasterxml.aalto.stax.InputFactoryImpl; import com.fasterxml.aalto.stax.OutputFactoryImpl; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -112,18 +113,34 @@ public class MoreoverResult implements Iterable<StreamsDatum> { try { this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class); - this.articles = resultObject.getArticles(); - this.articleArray = articles.getArticle(); - } catch (IOException e) { + } catch (JsonMappingException e) { + // theory is this may not be fatal + this.resultObject = (ArticlesResponse) e.getPath().get(0).getFrom(); + } catch (Exception e) { e.printStackTrace(); + logger.warn("Unable to process document:"); + logger.warn(xmlString); + } + + if( this.resultObject.getStatus().equals("FAILURE")) + { + logger.warn(this.resultObject.getStatus()); + logger.warn(this.resultObject.getMessageCode()); + logger.warn(this.resultObject.getUserMessage()); + logger.warn(this.resultObject.getDeveloperMessage()); } + else + { + this.articles = resultObject.getArticles(); + this.articleArray = articles.getArticle(); - for (Article article : articleArray) { - BigInteger sequenceid = new BigInteger(article.getSequenceId()); - list.add(new StreamsDatum(article, sequenceid)); - logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid); - if (sequenceid.compareTo(this.maxSequencedId) > 0) { - this.maxSequencedId = sequenceid; + for (Article article : articleArray) { + BigInteger sequenceid = new BigInteger(article.getSequenceId()); + list.add(new StreamsDatum(article, sequenceid)); + logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid); + if (sequenceid.compareTo(this.maxSequencedId) > 0) { + this.maxSequencedId = sequenceid; + } } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java index 7e7e553..78623b0 100644 --- a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java +++ b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java @@ -136,6 +136,7 @@ public class StreamsDatum implements Serializable { @Override public String toString() { - return "Document="+this.document+"\ttimestamp="+this.timestamp+"\tsequence="+this.sequenceid; + return this.id+"\tDocument="+this.document+"\ttimestamp="+this.timestamp+"\tsequence="+this.sequenceid; } + }
