implements ordered fields and metadata
needs tests

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8d2b3e21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8d2b3e21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8d2b3e21

Branch: refs/heads/asf-master
Commit: 8d2b3e219fd1ba82078bb0cb09c37595cd0e0d00
Parents: aac4863
Author: sblackmon <sblack...@apache.org>
Authored: Tue Mar 3 19:55:03 2015 -0600
Committer: sblackmon <sblack...@apache.org>
Committed: Tue Mar 3 19:55:03 2015 -0600

----------------------------------------------------------------------
 streams-contrib/streams-persist-hdfs/pom.xml    |  4 +-
 .../org/apache/streams/hdfs/HdfsConstants.java  | 13 ++++
 .../streams/hdfs/WebHdfsPersistReader.java      |  2 +-
 .../streams/hdfs/WebHdfsPersistReaderTask.java  | 78 ++++++++++++++------
 .../streams/hdfs/WebHdfsPersistWriter.java      | 45 +++++++----
 .../apache/streams/hdfs/HdfsConfiguration.json  | 20 +++++
 6 files changed, 123 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/pom.xml 
b/streams-contrib/streams-persist-hdfs/pom.xml
index d924fb8..9bf86ed 100644
--- a/streams-contrib/streams-persist-hdfs/pom.xml
+++ b/streams-contrib/streams-persist-hdfs/pom.xml
@@ -115,9 +115,7 @@
                     <addCompileSourceRoot>true</addCompileSourceRoot>
                     <generateBuilders>true</generateBuilders>
                     <sourcePaths>
-                        
<sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json</sourcePath>
-                        
<sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json</sourcePath>
-                        
<sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema</sourcePath>
                     </sourcePaths>
                     
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
                     <targetPackage>org.apache.streams.hdfs.pojo</targetPackage>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java
new file mode 100644
index 0000000..ab0ca1a
--- /dev/null
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java
@@ -0,0 +1,13 @@
+package org.apache.streams.hdfs;
+
+/**
+ * Created by sblackmon on 3/3/15.
+ */
+public class HdfsConstants {
+
+    protected static final String ID = "ID";
+    protected static final String TS = "TS";
+    protected static final String META = "META";
+    protected static final String DOC = "DOC";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index b002d1a..02345a2 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -64,7 +64,7 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader, DatumStatusCo
 
     protected ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    private HdfsReaderConfiguration hdfsConfiguration;
+    protected HdfsReaderConfiguration hdfsConfiguration;
 
     private ExecutorService executor;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 7475d0d..3f9b906 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -18,7 +18,9 @@
 
 package org.apache.streams.hdfs;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
@@ -28,7 +30,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Map;
 
 public class WebHdfsPersistReaderTask implements Runnable {
 
@@ -102,34 +106,66 @@ public class WebHdfsPersistReaderTask implements Runnable 
{
 
     private StreamsDatum processLine(String line) {
 
-        String[] fields = line.split(Character.toString(reader.DELIMITER));
+        StreamsDatum datum;
+
+        String[] fields = 
line.split(reader.hdfsConfiguration.getFieldDelimiter());
 
         if( fields.length == 0)
             return null;
-        if( fields.length == 1)
-            return new StreamsDatum(fields[0]);
-        if( fields.length == 2)
-            return new StreamsDatum(fields[1], fields[0]);
-        if( fields.length == 3) {
-
-            DateTime timestamp = null;
-            try {
-                long longts = Long.parseLong(fields[1]);
-                timestamp = new DateTime(longts);
-            } catch ( Exception e ) {}
-            try {
-                timestamp = reader.mapper.readValue(fields[1], DateTime.class);
-            } catch ( Exception e ) {}
-
-            if( timestamp == null )
-                return new StreamsDatum(fields[0], fields[2]);
-            else
-                return new StreamsDatum(fields[0], fields[2], timestamp);
 
+        String id;
+        DateTime ts;
+        Map<String, Object> metadata;
+        String json;
+
+        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.DOC 
)) {
+            json = 
fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.DOC)];
+            datum = new StreamsDatum(json);
+        } else {
+            return null;
+        }
+
+        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.ID ) 
) {
+            id = 
fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.ID)];
+            datum.setId(id);
+        }
+        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.TS )) 
{
+            ts = 
parseTs(fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.TS)]);
+            datum.setTimestamp(ts);
+        }
+        if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.META 
)) {
+            metadata = 
parseMap(fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.META)]);
+            datum.setMetadata(metadata);
         }
 
-        return new StreamsDatum(fields[3], fields[0]);
+        return datum;
 
     }
 
+    private DateTime parseTs(String field) {
+
+        DateTime timestamp = null;
+        try {
+            long longts = Long.parseLong(field);
+            timestamp = new DateTime(longts);
+        } catch ( Exception e ) {}
+        try {
+            timestamp = reader.mapper.readValue(field, DateTime.class);
+        } catch ( Exception e ) {}
+
+        return timestamp;
+    }
+
+    private Map<String, Object> parseMap(String field) {
+
+        Map<String, Object> metadata = null;
+
+        try {
+            JsonNode jsonNode = reader.mapper.readValue(field, JsonNode.class);
+            metadata = reader.mapper.convertValue(jsonNode, Map.class);
+        } catch (IOException e) {
+            LOGGER.warn("failed in parseMap: " + e.getMessage());
+        }
+        return metadata;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 16b7f30..38ded28 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -20,7 +20,9 @@ package org.apache.streams.hdfs;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
@@ -40,6 +42,7 @@ import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 
@@ -71,7 +74,7 @@ public class WebHdfsPersistWriter implements 
StreamsPersistWriter, Flushable, Cl
 
     private ObjectMapper mapper = new ObjectMapper();
 
-    private HdfsWriterConfiguration hdfsConfiguration;
+    protected HdfsWriterConfiguration hdfsConfiguration;
 
     public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) {
         this.hdfsConfiguration = hdfsConfiguration;
@@ -243,9 +246,9 @@ public class WebHdfsPersistWriter implements 
StreamsPersistWriter, Flushable, Cl
     }
 
     private String convertResultToString(StreamsDatum entry) {
-        String metadata = null;
+        String metadataJson = null;
         try {
-            metadata = mapper.writeValueAsString(entry.getMetadata());
+            metadataJson = mapper.writeValueAsString(entry.getMetadata());
         } catch (JsonProcessingException e) {
             LOGGER.warn("Error converting metadata to a string", e);
         }
@@ -259,17 +262,31 @@ public class WebHdfsPersistWriter implements 
StreamsPersistWriter, Flushable, Cl
 
         if (Strings.isNullOrEmpty(documentJson))
             return null;
-        else
-            return new StringBuilder()
-                    .append(entry.getId())
-                    .append(DELIMITER)
-                    .append(entry.getTimestamp())
-                    .append(DELIMITER)
-                    .append(metadata)
-                    .append(DELIMITER)
-                    .append(documentJson)
-                    .append("\n")
-                    .toString();
+        else {
+            StringBuilder stringBuilder = new StringBuilder();
+            Iterator<String> fields = hdfsConfiguration.getFields().iterator();
+            List<String> fielddata = Lists.newArrayList();
+            Joiner joiner = 
Joiner.on(hdfsConfiguration.getFieldDelimiter()).useForNull("");
+            while( fields.hasNext() ) {
+                String field = fields.next();
+                if( field.equals(HdfsConstants.DOC) )
+                    fielddata.add(documentJson);
+                else if( field.equals(HdfsConstants.ID) )
+                    fielddata.add(entry.getId());
+                else if( field.equals(HdfsConstants.TS) )
+                    fielddata.add(entry.getTimestamp().toString());
+                else if( field.equals(HdfsConstants.META) )
+                    fielddata.add(metadataJson);
+                else if( entry.getMetadata().containsKey(field)) {
+                    fielddata.add(entry.getMetadata().get(field).toString());
+                } else {
+                    fielddata.add(null);
+                }
+
+            }
+            joiner.appendTo(stringBuilder, fielddata);
+            return 
stringBuilder.append(hdfsConfiguration.getLineDelimiter()).toString();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
 
b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
index 8d0a397..e6e1e4c 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
@@ -33,6 +33,26 @@
         "password": {
             "type": "string",
             "description": "Password"
+        },
+        "fields": {
+          "type": "array",
+          "items": {
+            "type": "string"
+          },
+          "default": [
+            "ID",
+            "TS",
+            "META",
+            "DOC"
+          ]
+        },
+        "field_delimiter": {
+          "type": "string",
+          "default": "\t"
+        },
+        "line_delimiter": {
+          "type": "string",
+          "default": "\n"
         }
     }
 }
\ No newline at end of file

Reply via email to