implements missing metadata needs tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/aac48633 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/aac48633 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/aac48633 Branch: refs/heads/asf-master Commit: aac486338b61aea4aac3d969618da6b642297b3e Parents: 78a49ee Author: sblackmon <sblack...@apache.org> Authored: Tue Mar 3 11:21:57 2015 -0600 Committer: sblackmon <sblack...@apache.org> Committed: Tue Mar 3 11:21:57 2015 -0600 ---------------------------------------------------------------------- .../streams/hdfs/WebHdfsPersistReader.java | 3 +- .../streams/hdfs/WebHdfsPersistReaderTask.java | 48 +++++++++++++++++--- 2 files changed, 43 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aac48633/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index c0213f9..b002d1a 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.streams.core.*; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +62,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo protected volatile Queue<StreamsDatum> persistQueue; - private ObjectMapper mapper = new ObjectMapper(); + protected ObjectMapper mapper = StreamsJacksonMapper.getInstance(); private HdfsReaderConfiguration hdfsConfiguration; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aac48633/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java index 72d5581..7475d0d 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java @@ -22,6 +22,7 @@ import com.google.common.base.Strings; import org.apache.hadoop.fs.FileStatus; import org.apache.streams.core.DatumStatus; import org.apache.streams.core.StreamsDatum; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,13 +62,14 @@ public class WebHdfsPersistReaderTask implements Runnable { line = bufferedReader.readLine(); if( !Strings.isNullOrEmpty(line) ) { reader.countersCurrent.incrementAttempt(); - String[] fields = line.split(Character.toString(reader.DELIMITER)); - // Temporarily disabling timestamp reads to make reader and writer compatible - // This capability will be restore in PR for STREAMS-169 - //StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(Long.parseLong(fields[2]))); - StreamsDatum entry = new StreamsDatum(fields[3], fields[0]); - write( entry ); - reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); + StreamsDatum entry = processLine(line); + if( entry != null ) { + write(entry); + reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); + } else { + LOGGER.warn("processLine failed"); + reader.countersCurrent.incrementStatus(DatumStatus.FAIL); + } } } catch (Exception e) { e.printStackTrace(); @@ -98,4 +100,36 @@ public class WebHdfsPersistReaderTask implements Runnable { while( !success ); } + private StreamsDatum processLine(String line) { + + String[] fields = line.split(Character.toString(reader.DELIMITER)); + + if( fields.length == 0) + return null; + if( fields.length == 1) + return new StreamsDatum(fields[0]); + if( fields.length == 2) + return new StreamsDatum(fields[1], fields[0]); + if( fields.length == 3) { + + DateTime timestamp = null; + try { + long longts = Long.parseLong(fields[1]); + timestamp = new DateTime(longts); + } catch ( Exception e ) {} + try { + timestamp = reader.mapper.readValue(fields[1], DateTime.class); + } catch ( Exception e ) {} + + if( timestamp == null ) + return new StreamsDatum(fields[0], fields[2]); + else + return new StreamsDatum(fields[0], fields[2], timestamp); + + } + + return new StreamsDatum(fields[3], fields[0]); + + } + }