Repository: apex-malhar Updated Branches: refs/heads/master 99f89731b -> 4c651f990
APEXMALHAR-2152 - FSLoader fixed length support Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/faf7a607 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/faf7a607 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/faf7a607 Branch: refs/heads/master Commit: faf7a60744e6c0b24e5e9c455e5e777747eb582b Parents: 2e47b4c Author: shubham <[email protected]> Authored: Tue Aug 23 14:33:12 2016 +0530 Committer: shubham <[email protected]> Committed: Wed Aug 31 15:25:45 2016 -0700 ---------------------------------------------------------------------- contrib/pom.xml | 6 + .../contrib/enrich/FixedWidthFSLoader.java | 270 +++++++++++++++++++ .../contrib/enrich/FileEnrichmentTest.java | 69 ++++- .../src/test/resources/fixed-width-sample.txt | 6 + 4 files changed, 350 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/faf7a607/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 70a615c..84a7e05 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -658,5 +658,11 @@ <version>1.8.0.7</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.univocity</groupId> + <artifactId>univocity-parsers</artifactId> + <version>2.0.0</version> + <optional>true</optional> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/faf7a607/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java new file mode 100644 index 0000000..8b7eac0 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; +import com.univocity.parsers.fixed.FixedWidthFields; +import com.univocity.parsers.fixed.FixedWidthParser; +import com.univocity.parsers.fixed.FixedWidthParserSettings; + +import com.datatorrent.contrib.parser.AbstractCsvParser.FIELD_TYPE; +import com.datatorrent.contrib.parser.AbstractCsvParser.Field; + +/** + * This implementation of {@link FSLoader} is used to load data from fixed width + * file.User needs to set {@link FixedWidthFSLoader#fieldDescription} to specify + * field information. + */ [email protected] +public class FixedWidthFSLoader extends FSLoader +{ + + private transient List<FixedWidthField> fields; + /** + * Indicates whether first line of the file is a header. Default is false + */ + private boolean hasHeader; + + /** + * Specifies information related to fields in fixed-width file. Format is + * [NAME]:[FIELD_TYPE]:[WIDTH]:[date format if FIELD_TYPE is DATE] FIELD_TYPE + * can take on of the values of {@link FIELD_TYPE} i.e BOOLEAN, DOUBLE, + * INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE e.g. + * Year:INTEGER:4,Make:STRING:5,Model:STRING:40,Description:STRING:40, + * Price:DOUBLE:8,Date:DATE:10:\"dd:mm:yyyy\". Date format needs to be within + * quotes (" ") + */ + @NotNull + private String fieldDescription; + + /** + * Array containing headers + */ + private transient String[] header; + /** + * Padding character. Default is white space. + */ + private char padding = ' '; + private transient FixedWidthParser fixedWidthParser; + private transient boolean initialized; + + private static final Logger logger = LoggerFactory.getLogger(FixedWidthFSLoader.class); + + /** + * Gets the option if file has header or not. + * + * @return hasHeader,indicating whether first line of the file is a header. + */ + public boolean isHasHeader() + { + return hasHeader; + } + + /** + * Set to true if file has header + * + * @param hasHeader + * Indicates whether first line of the file is a header. Default is + * false + */ + public void setHasHeader(boolean hasHeader) + { + this.hasHeader = hasHeader; + } + + /** + * Gets the field description + * + * @return fieldDescription. String specifying information related to fields + * in fixed-width file. + */ + public String getFieldDescription() + { + return fieldDescription; + } + + /** + * Sets fieldDescription + * + * @param fieldDescription + * a String specifying information related to fields in fixed-width + * file. Format is [NAME]:[FIELD_TYPE]:[WIDTH]:[date format if + * FIELD_TYPE is DATE] FIELD_TYPE can take on of the values of + * {@link FIELD_TYPE} + * e.g.Year:INTEGER:4,Make:STRING:5,Model:STRING:40,Description: + * STRING:40, Price:DOUBLE:8,Date:DATE:10:\"dd:mm:yyyy\" Date format + * needs to be within quotes (" ") + */ + public void setFieldDescription(String fieldDescription) + { + this.fieldDescription = fieldDescription; + } + + /** + * Gets the character used for padding in the fixed-width file.Default is + * white space (' ') + * + * @return Padding character. Default is white space. + */ + public char getPadding() + { + return padding; + } + + /** + * Sets the character used for padding in fixed-width file.Default is white + * space (' ') + * + * @param padding + * Padding character. Default is white space. + */ + public void setPadding(char padding) + { + this.padding = padding; + } + + public static class FixedWidthField extends Field + { + int width; + String dateFormat; + + public int getWidth() + { + return width; + } + + public void setWidth(int width) + { + this.width = width; + } + + public String getDateFormat() + { + return dateFormat; + } + + public void setDateFormat(String dateFormat) + { + this.dateFormat = dateFormat; + } + + } + + /** + * Extracts the fields from a fixed width record and returns a map containing + * field names and values + */ + @Override + Map<String, Object> extractFields(String line) + { + if (!initialized) { + init(); + initialized = true; + } + String[] values = fixedWidthParser.parseLine(line); + if (hasHeader && Arrays.deepEquals(values, header)) { + return null; + } + Map<String, Object> map = Maps.newHashMap(); + int i = 0; + for (FixedWidthField field : fields) { + map.put(field.getName(), getValue(field, values[i++])); + } + return map; + } + + private void init() + { + fields = new ArrayList<FixedWidthField>(); + List<String> headers = new ArrayList<String>(); + List<Integer> fieldWidth = new ArrayList<Integer>(); + for (String tmp : fieldDescription.split(",")) { + String[] fieldTuple = tmp.split(":(?=([^\"]*\"[^\"]*\")*[^\"]*$)", -1); + FixedWidthField field = new FixedWidthField(); + field.setName(fieldTuple[0]); + field.setType(fieldTuple[1]); + field.setWidth(Integer.parseInt(fieldTuple[2])); + headers.add(fieldTuple[0]); + fieldWidth.add(Integer.parseInt(fieldTuple[2])); + if (field.getType() == FIELD_TYPE.DATE) { + if (fieldTuple.length > 3) { + field.setDateFormat(fieldTuple[3].replace("\"", "")); + } else { + logger.error("Date format is missing for the field {}", field.getName()); + throw new RuntimeException("Missing date format"); + } + } + fields.add(field); + } + header = headers.toArray(new String[headers.size()]); + int[] width = Ints.toArray(fieldWidth); + FixedWidthFields lengths = new FixedWidthFields(header, width); + FixedWidthParserSettings settings = new FixedWidthParserSettings(lengths); + settings.getFormat().setPadding(this.padding); + fixedWidthParser = new FixedWidthParser(settings); + } + + private Object getValue(FixedWidthField field, String value) + { + if (StringUtils.isEmpty(value)) { + return null; + } + switch (field.getType()) { + case BOOLEAN: + return Boolean.parseBoolean(value); + case DOUBLE: + return Double.parseDouble(value); + case INTEGER: + return Integer.parseInt(value); + case FLOAT: + return Float.parseFloat(value); + case LONG: + return Long.parseLong(value); + case SHORT: + return Short.parseShort(value); + case CHARACTER: + return value.charAt(0); + case DATE: + try { + return new SimpleDateFormat(field.getDateFormat()).parse(value); + } catch (ParseException e) { + logger.error("Error parsing date for format {} and value {}", field.getDateFormat(), value); + throw new RuntimeException(e); + } + default: + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/faf7a607/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java index 56f9c7f..d12cdae 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java @@ -21,7 +21,6 @@ package com.datatorrent.contrib.enrich; import java.io.File; import java.io.IOException; import java.net.URL; -import java.nio.charset.Charset; import java.util.Arrays; import java.util.Date; import java.util.Map; @@ -34,6 +33,7 @@ import org.apache.commons.io.FileUtils; import com.esotericsoftware.kryo.Kryo; import com.google.common.collect.Maps; + import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.TestUtils; @@ -166,4 +166,71 @@ public class FileEnrichmentTest Assert.assertTrue(emitted.get("mfgDate") instanceof Date); } + @Test + public void testEnrichmentOperatorFixedWidthFSLoader() throws IOException, InterruptedException + { + URL origUrl = this.getClass().getResource("/fixed-width-sample.txt"); + MapEnricher oper = new MapEnricher(); + FixedWidthFSLoader store = new FixedWidthFSLoader(); + store.setFieldDescription( + "Year:INTEGER:4,Make:STRING:5,Model:STRING:40,Description:STRING:40,Price:DOUBLE:8,Date:DATE:10:\"dd:mm:yyyy\""); + store.setHasHeader(true); + store.setPadding('_'); + store.setFileName(origUrl.toString()); + oper.setLookupFields(Arrays.asList("Year")); + oper.setIncludeFields(Arrays.asList("Year", "Make", "Model", "Price", "Date")); + oper.setStore(store); + + oper.setup(null); + + CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<>(); + @SuppressWarnings({ "unchecked", "rawtypes" }) + CollectorTestSink<Object> tmp = (CollectorTestSink)sink; + oper.output.setSink(tmp); + + oper.activate(null); + + oper.beginWindow(0); + Map<String, Object> tuple = Maps.newHashMap(); + tuple.put("Year", 1997); + + Kryo kryo = new Kryo(); + oper.input.process(kryo.copy(tuple)); + + oper.endWindow(); + + oper.deactivate(); + oper.teardown(); + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size()); + Map<String, Object> emitted = sink.collectedTuples.iterator().next(); + + /* The fields present in original event is kept as it is */ + Assert.assertEquals("Number of fields in emitted tuple", 5, emitted.size()); + Assert.assertEquals("Value of Year is 1997", tuple.get("Year"), emitted.get("Year")); + + /* Check if Make is added to the event */ + Assert.assertEquals("Make is part of tuple", true, emitted.containsKey("Make")); + Assert.assertEquals("Value of Make", "Ford", emitted.get("Make")); + + /* Check if Model is added to the event */ + Assert.assertEquals("Model is part of tuple", true, emitted.containsKey("Model")); + Assert.assertEquals("Value of Model", "E350", emitted.get("Model")); + + /* Check if Price is added to the event */ + Assert.assertEquals("Price is part of tuple", true, emitted.containsKey("Price")); + Assert.assertEquals("Value of Price is 3000", 3000.0, emitted.get("Price")); + Assert.assertTrue(emitted.get("Price") instanceof Double); + + /* Check if Date is added to the event */ + Assert.assertEquals("Date is part of tuple", true, emitted.containsKey("Date")); + Date mfgDate = (Date)emitted.get("Date"); + Assert.assertEquals("value of day", 1, mfgDate.getDate()); + Assert.assertEquals("value of month", 0, mfgDate.getMonth()); + Assert.assertEquals("value of year", 2016, mfgDate.getYear() + 1900); + Assert.assertTrue(emitted.get("Date") instanceof Date); + + } + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/faf7a607/contrib/src/test/resources/fixed-width-sample.txt ---------------------------------------------------------------------- diff --git a/contrib/src/test/resources/fixed-width-sample.txt b/contrib/src/test/resources/fixed-width-sample.txt new file mode 100644 index 0000000..cd1b7b3 --- /dev/null +++ b/contrib/src/test/resources/fixed-width-sample.txt @@ -0,0 +1,6 @@ +YearMake_Model___________________________________Description_____________________________Price___Date______ +1997Ford_E350____________________________________ac, abs, moon___________________________3000.00_01:01:2016 +1999ChevyVenture "Extended Edition"______________________________________________________4900.00_01:01:2016 +1996Jeep_Grand Cherokee__________________________MUST SELL!air, moon roof, loaded_______ 4799.00_01:01:2016 +1999ChevyVenture "Extended Edition, Very Large"__________________________________________5000.00_01:01:2016 +_________Venture "Extended Edition"______________________________________________________4900.00_01:01:2016 \ No newline at end of file
