implements ordered fields and metadata needs tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8d2b3e21 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8d2b3e21 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8d2b3e21 Branch: refs/heads/asf-master Commit: 8d2b3e219fd1ba82078bb0cb09c37595cd0e0d00 Parents: aac4863 Author: sblackmon <sblack...@apache.org> Authored: Tue Mar 3 19:55:03 2015 -0600 Committer: sblackmon <sblack...@apache.org> Committed: Tue Mar 3 19:55:03 2015 -0600 ---------------------------------------------------------------------- streams-contrib/streams-persist-hdfs/pom.xml | 4 +- .../org/apache/streams/hdfs/HdfsConstants.java | 13 ++++ .../streams/hdfs/WebHdfsPersistReader.java | 2 +- .../streams/hdfs/WebHdfsPersistReaderTask.java | 78 ++++++++++++++------ .../streams/hdfs/WebHdfsPersistWriter.java | 45 +++++++---- .../apache/streams/hdfs/HdfsConfiguration.json | 20 +++++ 6 files changed, 123 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml index d924fb8..9bf86ed 100644 --- a/streams-contrib/streams-persist-hdfs/pom.xml +++ b/streams-contrib/streams-persist-hdfs/pom.xml @@ -115,9 +115,7 @@ <addCompileSourceRoot>true</addCompileSourceRoot> <generateBuilders>true</generateBuilders> <sourcePaths> - <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json</sourcePath> - <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json</sourcePath> - <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json</sourcePath> + <sourcePath>src/main/jsonschema</sourcePath> </sourcePaths> <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> <targetPackage>org.apache.streams.hdfs.pojo</targetPackage> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java new file mode 100644 index 0000000..ab0ca1a --- /dev/null +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java @@ -0,0 +1,13 @@ +package org.apache.streams.hdfs; + +/** + * Created by sblackmon on 3/3/15. + */ +public class HdfsConstants { + + protected static final String ID = "ID"; + protected static final String TS = "TS"; + protected static final String META = "META"; + protected static final String DOC = "DOC"; + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index b002d1a..02345a2 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -64,7 +64,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo protected ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - private HdfsReaderConfiguration hdfsConfiguration; + protected HdfsReaderConfiguration hdfsConfiguration; private ExecutorService executor; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java index 7475d0d..3f9b906 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java @@ -18,7 +18,9 @@ package org.apache.streams.hdfs; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Strings; +import com.google.common.collect.Maps; import org.apache.hadoop.fs.FileStatus; import org.apache.streams.core.DatumStatus; import org.apache.streams.core.StreamsDatum; @@ -28,7 +30,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStreamReader; +import java.util.Map; public class WebHdfsPersistReaderTask implements Runnable { @@ -102,34 +106,66 @@ public class WebHdfsPersistReaderTask implements Runnable { private StreamsDatum processLine(String line) { - String[] fields = line.split(Character.toString(reader.DELIMITER)); + StreamsDatum datum; + + String[] fields = line.split(reader.hdfsConfiguration.getFieldDelimiter()); if( fields.length == 0) return null; - if( fields.length == 1) - return new StreamsDatum(fields[0]); - if( fields.length == 2) - return new StreamsDatum(fields[1], fields[0]); - if( fields.length == 3) { - - DateTime timestamp = null; - try { - long longts = Long.parseLong(fields[1]); - timestamp = new DateTime(longts); - } catch ( Exception e ) {} - try { - timestamp = reader.mapper.readValue(fields[1], DateTime.class); - } catch ( Exception e ) {} - - if( timestamp == null ) - return new StreamsDatum(fields[0], fields[2]); - else - return new StreamsDatum(fields[0], fields[2], timestamp); + String id; + DateTime ts; + Map<String, Object> metadata; + String json; + + if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.DOC )) { + json = fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.DOC)]; + datum = new StreamsDatum(json); + } else { + return null; + } + + if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.ID ) ) { + id = fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.ID)]; + datum.setId(id); + } + if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.TS )) { + ts = parseTs(fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.TS)]); + datum.setTimestamp(ts); + } + if( reader.hdfsConfiguration.getFields().contains( HdfsConstants.META )) { + metadata = parseMap(fields[reader.hdfsConfiguration.getFields().indexOf(HdfsConstants.META)]); + datum.setMetadata(metadata); } - return new StreamsDatum(fields[3], fields[0]); + return datum; } + private DateTime parseTs(String field) { + + DateTime timestamp = null; + try { + long longts = Long.parseLong(field); + timestamp = new DateTime(longts); + } catch ( Exception e ) {} + try { + timestamp = reader.mapper.readValue(field, DateTime.class); + } catch ( Exception e ) {} + + return timestamp; + } + + private Map<String, Object> parseMap(String field) { + + Map<String, Object> metadata = null; + + try { + JsonNode jsonNode = reader.mapper.readValue(field, JsonNode.class); + metadata = reader.mapper.convertValue(jsonNode, Map.class); + } catch (IOException e) { + LOGGER.warn("failed in parseMap: " + e.getMessage()); + } + return metadata; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java index 16b7f30..38ded28 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java @@ -20,7 +20,9 @@ package org.apache.streams.hdfs; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; @@ -40,6 +42,7 @@ import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Date; +import java.util.Iterator; import java.util.List; import java.util.Queue; @@ -71,7 +74,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl private ObjectMapper mapper = new ObjectMapper(); - private HdfsWriterConfiguration hdfsConfiguration; + protected HdfsWriterConfiguration hdfsConfiguration; public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) { this.hdfsConfiguration = hdfsConfiguration; @@ -243,9 +246,9 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl } private String convertResultToString(StreamsDatum entry) { - String metadata = null; + String metadataJson = null; try { - metadata = mapper.writeValueAsString(entry.getMetadata()); + metadataJson = mapper.writeValueAsString(entry.getMetadata()); } catch (JsonProcessingException e) { LOGGER.warn("Error converting metadata to a string", e); } @@ -259,17 +262,31 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl if (Strings.isNullOrEmpty(documentJson)) return null; - else - return new StringBuilder() - .append(entry.getId()) - .append(DELIMITER) - .append(entry.getTimestamp()) - .append(DELIMITER) - .append(metadata) - .append(DELIMITER) - .append(documentJson) - .append("\n") - .toString(); + else { + StringBuilder stringBuilder = new StringBuilder(); + Iterator<String> fields = hdfsConfiguration.getFields().iterator(); + List<String> fielddata = Lists.newArrayList(); + Joiner joiner = Joiner.on(hdfsConfiguration.getFieldDelimiter()).useForNull(""); + while( fields.hasNext() ) { + String field = fields.next(); + if( field.equals(HdfsConstants.DOC) ) + fielddata.add(documentJson); + else if( field.equals(HdfsConstants.ID) ) + fielddata.add(entry.getId()); + else if( field.equals(HdfsConstants.TS) ) + fielddata.add(entry.getTimestamp().toString()); + else if( field.equals(HdfsConstants.META) ) + fielddata.add(metadataJson); + else if( entry.getMetadata().containsKey(field)) { + fielddata.add(entry.getMetadata().get(field).toString()); + } else { + fielddata.add(null); + } + + } + joiner.appendTo(stringBuilder, fielddata); + return stringBuilder.append(hdfsConfiguration.getLineDelimiter()).toString(); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d2b3e21/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json index 8d0a397..e6e1e4c 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json +++ b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json @@ -33,6 +33,26 @@ "password": { "type": "string", "description": "Password" + }, + "fields": { + "type": "array", + "items": { + "type": "string" + }, + "default": [ + "ID", + "TS", + "META", + "DOC" + ] + }, + "field_delimiter": { + "type": "string", + "default": "\t" + }, + "line_delimiter": { + "type": "string", + "default": "\n" } } } \ No newline at end of file