implements missing metadata
needs tests

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/aac48633
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/aac48633
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/aac48633

Branch: refs/heads/asf-master
Commit: aac486338b61aea4aac3d969618da6b642297b3e
Parents: 78a49ee
Author: sblackmon <sblack...@apache.org>
Authored: Tue Mar 3 11:21:57 2015 -0600
Committer: sblackmon <sblack...@apache.org>
Committed: Tue Mar 3 11:21:57 2015 -0600

----------------------------------------------------------------------
 .../streams/hdfs/WebHdfsPersistReader.java      |  3 +-
 .../streams/hdfs/WebHdfsPersistReaderTask.java  | 48 +++++++++++++++++---
 2 files changed, 43 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aac48633/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index c0213f9..b002d1a 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.core.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +62,7 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader, DatumStatusCo
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    protected ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     private HdfsReaderConfiguration hdfsConfiguration;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aac48633/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 72d5581..7475d0d 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -22,6 +22,7 @@ import com.google.common.base.Strings;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,13 +62,14 @@ public class WebHdfsPersistReaderTask implements Runnable {
                         line = bufferedReader.readLine();
                         if( !Strings.isNullOrEmpty(line) ) {
                             reader.countersCurrent.incrementAttempt();
-                            String[] fields = 
line.split(Character.toString(reader.DELIMITER));
-                            // Temporarily disabling timestamp reads to make 
reader and writer compatible
-                            // This capability will be restore in PR for 
STREAMS-169
-                            //StreamsDatum entry = new StreamsDatum(fields[3], 
fields[0], new DateTime(Long.parseLong(fields[2])));
-                            StreamsDatum entry = new StreamsDatum(fields[3], 
fields[0]);
-                            write( entry );
-                            
reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
+                            StreamsDatum entry = processLine(line);
+                            if( entry != null ) {
+                                write(entry);
+                                
reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
+                            } else {
+                                LOGGER.warn("processLine failed");
+                                
reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
+                            }
                         }
                     } catch (Exception e) {
                         e.printStackTrace();
@@ -98,4 +100,36 @@ public class WebHdfsPersistReaderTask implements Runnable {
         while( !success );
     }
 
+    private StreamsDatum processLine(String line) {
+
+        String[] fields = line.split(Character.toString(reader.DELIMITER));
+
+        if( fields.length == 0)
+            return null;
+        if( fields.length == 1)
+            return new StreamsDatum(fields[0]);
+        if( fields.length == 2)
+            return new StreamsDatum(fields[1], fields[0]);
+        if( fields.length == 3) {
+
+            DateTime timestamp = null;
+            try {
+                long longts = Long.parseLong(fields[1]);
+                timestamp = new DateTime(longts);
+            } catch ( Exception e ) {}
+            try {
+                timestamp = reader.mapper.readValue(fields[1], DateTime.class);
+            } catch ( Exception e ) {}
+
+            if( timestamp == null )
+                return new StreamsDatum(fields[0], fields[2]);
+            else
+                return new StreamsDatum(fields[0], fields[2], timestamp);
+
+        }
+
+        return new StreamsDatum(fields[3], fields[0]);
+
+    }
+
 }

Reply via email to