Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 49b85560c -> 9e77ef7bd
APEXMALHAR-1961 #comment enhanced existing csv parser to take in schema for validations Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9e77ef7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9e77ef7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9e77ef7b Branch: refs/heads/devel-3 Commit: 9e77ef7bd194b7c79c72f301a850ce5cac229f2c Parents: 49b8556 Author: shubham <[email protected]> Authored: Wed Dec 30 18:02:13 2015 +0530 Committer: Gaurav Gupta <[email protected]> Committed: Tue Jan 19 09:27:30 2016 -0800 ---------------------------------------------------------------------- contrib/pom.xml | 10 +- .../contrib/parser/CellProcessorBuilder.java | 456 ++++++++++++++++ .../datatorrent/contrib/parser/CsvParser.java | 362 +++++++------ .../contrib/parser/DelimitedSchema.java | 359 +++++++++++++ .../contrib/parser/CsvPOJOParserTest.java | 519 +++++++++++++++---- contrib/src/test/resources/schema.json | 96 ++++ .../com/datatorrent/lib/parser/JsonParser.java | 2 +- .../java/com/datatorrent/lib/parser/Parser.java | 4 +- .../com/datatorrent/lib/parser/XmlParser.java | 2 +- 9 files changed, 1522 insertions(+), 288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 7331940..16e28c8 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -477,7 +477,7 @@ <dependency> <groupId>net.sf.supercsv</groupId> <artifactId>super-csv</artifactId> - <version>2.2.0</version> + <version>2.4.0</version> <optional>true</optional> </dependency> <dependency> @@ -610,12 +610,6 @@ <artifactId>apex-common</artifactId> <version>${apex.core.version}</version> <type>jar</type> - </dependency> - <dependency> - <!-- required by Csv parser and formatter --> - <groupId>net.sf.supercsv</groupId> - <artifactId>super-csv-joda</artifactId> - <version>2.3.1</version> - </dependency> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java new file mode 100644 index 0000000..1993d94 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java @@ -0,0 +1,456 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import java.util.Map; + +import org.supercsv.cellprocessor.Optional; +import org.supercsv.cellprocessor.ParseBool; +import org.supercsv.cellprocessor.ParseChar; +import org.supercsv.cellprocessor.ParseDate; +import org.supercsv.cellprocessor.ParseDouble; +import org.supercsv.cellprocessor.ParseInt; +import org.supercsv.cellprocessor.ParseLong; +import org.supercsv.cellprocessor.constraint.DMinMax; +import org.supercsv.cellprocessor.constraint.Equals; +import org.supercsv.cellprocessor.constraint.LMinMax; +import org.supercsv.cellprocessor.constraint.StrMinMax; +import org.supercsv.cellprocessor.constraint.StrRegEx; +import org.supercsv.cellprocessor.constraint.Strlen; +import org.supercsv.cellprocessor.ift.CellProcessor; +import org.supercsv.cellprocessor.ift.DoubleCellProcessor; +import org.supercsv.cellprocessor.ift.LongCellProcessor; +import org.supercsv.util.CsvContext; + +import org.apache.commons.lang3.StringUtils; + +import com.datatorrent.contrib.parser.DelimitedSchema.FieldType; + +/** + * Helper class with methods to generate CellProcessor objects. Cell processors + * are an integral part of reading and writing with Super CSV - they automate + * the data type conversions, and enforce constraints. They implement the chain + * of responsibility design pattern - each processor has a single, well-defined + * purpose and can be chained together with other processors to fully automate + * all of the required conversions and constraint validation for a single + * delimited record. + * + */ +public class CellProcessorBuilder +{ + + /** + * Method to get cell processors for given field type and constraints + * + * @param fieldType + * data type of the field + * @param constraints + * a map of constraints + * @return + */ + public static CellProcessor getCellProcessor(FieldType fieldType, Map<String, Object> constraints) + { + switch (fieldType) { + case STRING: + return getStringCellProcessor(constraints); + case INTEGER: + return getIntegerCellProcessor(constraints); + case LONG: + return getLongCellProcessor(constraints); + case FLOAT: + case DOUBLE: + return getDoubleCellProcessor(constraints); + case CHARACTER: + return getCharCellProcessor(constraints); + case BOOLEAN: + return getBooleanCellProcessor(constraints); + case DATE: + return getDateCellProcessor(constraints); + default: + return null; + } + } + + /** + * Method to get cellprocessor for String with constraints. These constraints + * are evaluated against the String field for which this cellprocessor is + * defined. + * + * @param constraints + * map of constraints applicable to String + * @return CellProcessor + */ + private static CellProcessor getStringCellProcessor(Map<String, Object> constraints) + { + Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean + .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED)); + Integer strLen = constraints.get(DelimitedSchema.LENGTH) == null ? null : Integer.parseInt((String)constraints + .get(DelimitedSchema.LENGTH)); + Integer minLength = constraints.get(DelimitedSchema.MIN_LENGTH) == null ? null : Integer + .parseInt((String)constraints.get(DelimitedSchema.MIN_LENGTH)); + Integer maxLength = constraints.get(DelimitedSchema.MAX_LENGTH) == null ? null : Integer + .parseInt((String)constraints.get(DelimitedSchema.MAX_LENGTH)); + String equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : (String)constraints + .get(DelimitedSchema.EQUALS); + String pattern = constraints.get(DelimitedSchema.REGEX_PATTERN) == null ? null : (String)constraints + .get(DelimitedSchema.REGEX_PATTERN); + + CellProcessor cellProcessor = null; + if (StringUtils.isNotBlank(equals)) { + cellProcessor = new Equals(equals); + } else if (StringUtils.isNotBlank(pattern)) { + cellProcessor = new StrRegEx(pattern); + } else if (strLen != null) { + cellProcessor = new Strlen(strLen); + } else if (maxLength != null || minLength != null) { + Long min = minLength == null ? 0L : minLength; + Long max = maxLength == null ? LMinMax.MAX_LONG : maxLength; + cellProcessor = new StrMinMax(min, max); + } + if (required == null || !required) { + cellProcessor = addOptional(cellProcessor); + } + return cellProcessor; + } + + /** + * Method to get cellprocessor for Integer with constraints. These constraints + * are evaluated against the Integer field for which this cellprocessor is + * defined. + * + * @param constraints + * map of constraints applicable to Integer + * @return CellProcessor + */ + private static CellProcessor getIntegerCellProcessor(Map<String, Object> constraints) + { + Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean + .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED)); + Integer equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : Integer.parseInt((String)constraints + .get(DelimitedSchema.EQUALS)); + Integer minValue = constraints.get(DelimitedSchema.MIN_VALUE) == null ? null : Integer.parseInt((String)constraints + .get(DelimitedSchema.MIN_VALUE)); + Integer maxValue = constraints.get(DelimitedSchema.MAX_VALUE) == null ? null : Integer.parseInt((String)constraints + .get(DelimitedSchema.MAX_VALUE)); + + CellProcessor cellProcessor = null; + if (equals != null) { + cellProcessor = new Equals(equals); + cellProcessor = addParseInt(cellProcessor); + } else if (minValue != null || maxValue != null) { + cellProcessor = addIntMinMax(minValue, maxValue); + } else { + cellProcessor = addParseInt(null); + } + if (required == null || !required) { + cellProcessor = addOptional(cellProcessor); + } + return cellProcessor; + } + + /** + * Method to get cellprocessor for Long with constraints. These constraints + * are evaluated against the Long field for which this cellprocessor is + * defined. + * + * @param constraints + * map of constraints applicable to Long + * @return CellProcessor + */ + private static CellProcessor getLongCellProcessor(Map<String, Object> constraints) + { + Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean + .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED)); + Long equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : Long.parseLong((String)constraints + .get(DelimitedSchema.EQUALS)); + Long minValue = constraints.get(DelimitedSchema.MIN_VALUE) == null ? null : Long.parseLong((String)constraints + .get(DelimitedSchema.MIN_VALUE)); + Long maxValue = constraints.get(DelimitedSchema.MAX_VALUE) == null ? null : Long.parseLong((String)constraints + .get(DelimitedSchema.MAX_VALUE)); + CellProcessor cellProcessor = null; + if (equals != null) { + cellProcessor = new Equals(equals); + cellProcessor = addParseLong(cellProcessor); + } else if (minValue != null || maxValue != null) { + cellProcessor = addLongMinMax(minValue, maxValue); + } else { + cellProcessor = addParseLong(null); + } + if (required == null || !required) { + cellProcessor = addOptional(cellProcessor); + } + return cellProcessor; + } + + /** + * Method to get cellprocessor for Float/Double with constraints. These + * constraints are evaluated against the Float/Double field for which this + * cellprocessor is defined. + * + * @param constraints + * map of constraints applicable to Float/Double + * @return CellProcessor + */ + private static CellProcessor getDoubleCellProcessor(Map<String, Object> constraints) + { + Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean + .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED)); + Double equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : Double.parseDouble((String)constraints + .get(DelimitedSchema.EQUALS)); + Double minValue = constraints.get(DelimitedSchema.MIN_VALUE) == null ? null : Double + .parseDouble((String)constraints.get(DelimitedSchema.MIN_VALUE)); + Double maxValue = constraints.get(DelimitedSchema.MAX_VALUE) == null ? null : Double + .parseDouble((String)constraints.get(DelimitedSchema.MAX_VALUE)); + CellProcessor cellProcessor = null; + if (equals != null) { + cellProcessor = new Equals(equals); + cellProcessor = addParseDouble(cellProcessor); + } else if (minValue != null || maxValue != null) { + cellProcessor = addDoubleMinMax(minValue, maxValue); + } else { + cellProcessor = addParseDouble(null); + } + if (required == null || !required) { + cellProcessor = addOptional(cellProcessor); + } + return cellProcessor; + } + + /** + * Method to get cellprocessor for Boolean with constraints. These constraints + * are evaluated against the Boolean field for which this cellprocessor is + * defined. + * + * @param constraints + * map of constraints applicable to Boolean + * @return CellProcessor + */ + private static CellProcessor getBooleanCellProcessor(Map<String, Object> constraints) + { + Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean + .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED)); + String trueValue = constraints.get(DelimitedSchema.TRUE_VALUE) == null ? null : (String)constraints + .get(DelimitedSchema.TRUE_VALUE); + String falseValue = constraints.get(DelimitedSchema.FALSE_VALUE) == null ? null : (String)constraints + .get(DelimitedSchema.FALSE_VALUE); + CellProcessor cellProcessor = null; + if (StringUtils.isNotBlank(trueValue) && StringUtils.isNotBlank(falseValue)) { + cellProcessor = new ParseBool(trueValue, falseValue); + } else { + cellProcessor = new ParseBool(); + } + if (required == null || !required) { + cellProcessor = addOptional(cellProcessor); + } + return cellProcessor; + } + + /** + * Method to get cellprocessor for Date with constraints. These constraints + * are evaluated against the Date field for which this cellprocessor is + * defined. + * + * @param constraints + * map of constraints applicable to Date + * @return CellProcessor + */ + private static CellProcessor getDateCellProcessor(Map<String, Object> constraints) + { + Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean + .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED)); + String format = constraints.get(DelimitedSchema.DATE_FORMAT) == null ? null : (String)constraints + .get(DelimitedSchema.DATE_FORMAT); + CellProcessor cellProcessor = null; + String fmt = StringUtils.isNotBlank(format) ? format : "dd/MM/yyyy"; + cellProcessor = new ParseDate(fmt, false); + if (required == null || !required) { + cellProcessor = addOptional(cellProcessor); + } + return cellProcessor; + } + + /** + * Method to get cellprocessor for Char with constraints. These constraints + * are evaluated against the Char field for which this cellprocessor is + * defined. + * + * @param constraints + * map of constraints applicable to Char + * @return CellProcessor + */ + private static CellProcessor getCharCellProcessor(Map<String, Object> constraints) + { + Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean + .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED)); + Character equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : ((String)constraints + .get(DelimitedSchema.EQUALS)).charAt(0); + + CellProcessor cellProcessor = null; + if (equals != null) { + cellProcessor = new Equals(equals); + } + cellProcessor = addParseChar(cellProcessor); + if (required == null || !required) { + cellProcessor = addOptional(cellProcessor); + } + return cellProcessor; + } + + /** + * Get a Double Min Max cellprocessor. + * + * @param minValue + * minimum value. + * @param maxValue + * maximum value. + * @return CellProcessor + */ + private static CellProcessor addDoubleMinMax(Double minValue, Double maxValue) + { + Double min = minValue == null ? DMinMax.MIN_DOUBLE : minValue; + Double max = maxValue == null ? DMinMax.MAX_DOUBLE : maxValue; + return new DMinMax(min, max); + } + + /** + * Get a Long Min Max cellprocessor. + * + * @param minValue + * minimum value. + * @param maxValue + * maximum value. + * @return CellProcessor + */ + private static CellProcessor addLongMinMax(Long minValue, Long maxValue) + { + Long min = minValue == null ? LMinMax.MIN_LONG : minValue; + Long max = maxValue == null ? LMinMax.MAX_LONG : maxValue; + return new LMinMax(min, max); + } + + /** + * Get a Int Min Max cellprocessor. + * + * @param minValue + * minimum value. + * @param maxValue + * maximum value. + * @return CellProcessor + */ + private static CellProcessor addIntMinMax(Integer minValue, Integer maxValue) + { + Integer min = minValue == null ? Integer.MIN_VALUE : minValue; + Integer max = maxValue == null ? Integer.MAX_VALUE : maxValue; + return new IntMinMax(min, max); + } + + /** + * Get Optional cellprocessor which means field is not mandatory. + * + * @param cellProcessor + * next processor in the chain. + * @return CellProcessor + */ + private static CellProcessor addOptional(CellProcessor cellProcessor) + { + if (cellProcessor == null) { + return new Optional(); + } + return new Optional(cellProcessor); + } + + /** + * Get cellprocessor to parse String as Integer. + * + * @param cellProcessor + * next processor in the chain. + * @return CellProcessor + */ + private static CellProcessor addParseInt(CellProcessor cellProcessor) + { + if (cellProcessor == null) { + return new ParseInt(); + } + return new ParseInt((LongCellProcessor)cellProcessor); + } + + /** + * Get cellprocessor to parse String as Long. + * + * @param cellProcessor + * next processor in the chain. + * @return CellProcessor + */ + private static CellProcessor addParseLong(CellProcessor cellProcessor) + { + if (cellProcessor == null) { + return new ParseLong(); + } + return new ParseLong((LongCellProcessor)cellProcessor); + } + + /** + * Get cellprocessor to parse String as Double. + * + * @param cellProcessor + * next processor in the chain. + * @return CellProcessor + */ + private static CellProcessor addParseDouble(CellProcessor cellProcessor) + { + if (cellProcessor == null) { + return new ParseDouble(); + } + return new ParseDouble((DoubleCellProcessor)cellProcessor); + } + + /** + * Get cellprocessor to parse String as Character. + * + * @param cellProcessor + * next processor in the chain. + * @return CellProcessor + */ + private static CellProcessor addParseChar(CellProcessor cellProcessor) + { + if (cellProcessor == null) { + return new ParseChar(); + } + return new ParseChar((DoubleCellProcessor)cellProcessor); + } + + /** + * Custom Cell processor to handle min max constraints for Integers + */ + private static class IntMinMax extends LMinMax + { + public IntMinMax(int min, int max) + { + super(min, max); + } + + @Override + public Object execute(Object value, CsvContext context) + { + Long result = (Long)super.execute(value, context); + return result.intValue(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java index af1eebe..bb72e82 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java @@ -19,287 +19,271 @@ package com.datatorrent.contrib.parser; import java.io.IOException; -import java.util.ArrayList; +import java.util.List; +import java.util.Map; import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.supercsv.cellprocessor.Optional; -import org.supercsv.cellprocessor.ParseBool; -import org.supercsv.cellprocessor.ParseChar; -import org.supercsv.cellprocessor.ParseDate; -import org.supercsv.cellprocessor.ParseDouble; -import org.supercsv.cellprocessor.ParseInt; -import org.supercsv.cellprocessor.ParseLong; import org.supercsv.cellprocessor.ift.CellProcessor; +import org.supercsv.exception.SuperCsvException; import org.supercsv.io.CsvBeanReader; +import org.supercsv.io.CsvMapReader; import org.supercsv.prefs.CsvPreference; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.annotations.VisibleForTesting; + +import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.contrib.parser.DelimitedSchema.Field; import com.datatorrent.lib.parser.Parser; +import com.datatorrent.lib.util.KeyValPair; import com.datatorrent.lib.util.ReusableStringReader; import com.datatorrent.netlet.util.DTThrowable; /** - * Operator that converts CSV string to Pojo <br> + * Operator that parses a delimited tuple against a specified schema <br> + * Schema is specified in a json format as per {@link DelimitedSchema} that + * contains field information and constraints for each field.<br> * Assumption is that each field in the delimited data should map to a simple * java type.<br> * <br> * <b>Properties</b> <br> - * <b>fieldInfo</b>:User need to specify fields and their types as a comma - * separated string having format <NAME>:<TYPE>|<FORMAT> in - * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as - * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br> - * <b>fieldDelimiter</b>: Default is comma <br> - * <b>lineDelimiter</b>: Default is '\r\n' + * <b>schema</b>:schema as a string<br> + * <b>clazz</b>:Pojo class <br> + * <b>Ports</b> <br> + * <b>in</b>:input tuple as a byte array. Each tuple represents a record<br> + * <b>parsedOutput</b>:tuples that are validated against the schema are emitted + * as Map<String,Object> on this port<br> + * <b>out</b>:tuples that are validated against the schema are emitted as pojo + * on this port<br> + * <b>err</b>:tuples that do not confine to schema are emitted on this port as + * KeyValPair<String,String><br> + * Key being the tuple and Val being the reason. * * @displayName CsvParser * @category Parsers * @tags csv pojo parser * @since 3.2.0 */ -public class CsvParser extends Parser<String, String> [email protected] +public class CsvParser extends Parser<byte[], KeyValPair<String, String>> { - - private ArrayList<Field> fields; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; - + /** + * Map Reader to read delimited records + */ + private transient CsvMapReader csvMapReader; + /** + * Bean Reader to read delimited records + */ + private transient CsvBeanReader csvBeanReader; + /** + * Reader used by csvMapReader and csvBeanReader + */ + private transient ReusableStringReader csvStringReader; + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link DelimitedSchema} + */ @NotNull - protected String fieldInfo; - - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - private transient CsvBeanReader csvReader; - - public enum FIELD_TYPE - { - BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; + private String schema; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Names of all the fields in the same order of incoming records + */ + private transient String[] nameMapping; + /** + * header-this will be delimiter separated string of field names + */ + private transient String header; + /** + * Reading preferences that are passed through schema + */ + private transient CsvPreference preference; - @NotNull - private transient ReusableStringReader csvStringReader = new ReusableStringReader(); + /** + * metric to keep count of number of tuples emitted on {@link #parsedOutput} + * port + */ + @AutoMetric + long parsedOutputCount; - public CsvParser() + @Override + public void beginWindow(long windowId) { - fields = new ArrayList<Field>(); - fieldDelimiter = ','; - lineDelimiter = "\r\n"; + super.beginWindow(windowId); + parsedOutputCount = 0; } @Override public void setup(OperatorContext context) { - super.setup(context); - - logger.info("field info {}", fieldInfo); - fields = new ArrayList<Field>(); - String[] fieldInfoTuple = fieldInfo.split(","); - for (int i = 0; i < fieldInfoTuple.length; i++) { - String[] fieldTuple = fieldInfoTuple[i].split(":"); - Field field = new Field(); - field.setName(fieldTuple[0]); - String[] typeFormat = fieldTuple[1].split("\\|"); - field.setType(typeFormat[0].toUpperCase()); - if (typeFormat.length > 1) { - field.setFormat(typeFormat[1]); - } - getFields().add(field); - } - - CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); - csvReader = new CsvBeanReader(csvStringReader, preference); - int countKeyValue = getFields().size(); - logger.info("countKeyValue {}", countKeyValue); - nameMapping = new String[countKeyValue]; - processors = new CellProcessor[countKeyValue]; - initialise(nameMapping, processors); + delimitedParserSchema = new DelimitedSchema(schema); + preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(), + delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build(); + nameMapping = delimitedParserSchema.getFieldNames().toArray( + new String[delimitedParserSchema.getFieldNames().size()]); + header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + ""); + processors = getProcessor(delimitedParserSchema.getFields()); + csvStringReader = new ReusableStringReader(); + csvMapReader = new CsvMapReader(csvStringReader, preference); + csvBeanReader = new CsvBeanReader(csvStringReader, preference); } - private void initialise(String[] nameMapping, CellProcessor[] processors) - { - for (int i = 0; i < getFields().size(); i++) { - FIELD_TYPE type = getFields().get(i).type; - nameMapping[i] = getFields().get(i).name; - if (type == FIELD_TYPE.DOUBLE) { - processors[i] = new Optional(new ParseDouble()); - } else if (type == FIELD_TYPE.INTEGER) { - processors[i] = new Optional(new ParseInt()); - } else if (type == FIELD_TYPE.FLOAT) { - processors[i] = new Optional(new ParseDouble()); - } else if (type == FIELD_TYPE.LONG) { - processors[i] = new Optional(new ParseLong()); - } else if (type == FIELD_TYPE.SHORT) { - processors[i] = new Optional(new ParseInt()); - } else if (type == FIELD_TYPE.STRING) { - processors[i] = new Optional(); - } else if (type == FIELD_TYPE.CHARACTER) { - processors[i] = new Optional(new ParseChar()); - } else if (type == FIELD_TYPE.BOOLEAN) { - processors[i] = new Optional(new ParseBool()); - } else if (type == FIELD_TYPE.DATE) { - String dateFormat = getFields().get(i).format; - processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat)); - } - } - } @Override - public Object convert(String tuple) + public Object convert(byte[] tuple) { - try { - csvStringReader.open(tuple); - return csvReader.read(clazz, nameMapping, processors); - } catch (IOException e) { - logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); - return null; - } + throw new UnsupportedOperationException("Not supported"); } @Override - public void teardown() + public void processTuple(byte[] tuple) { - try { - if (csvReader != null) { - csvReader.close(); + if (tuple == null) { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(null, "Blank/null tuple")); } - } catch (IOException e) { - DTThrowable.rethrow(e); - } - } - - @Override - public String processErorrTuple(String input) - { - return input; - } - - public static class Field - { - String name; - String format; - FIELD_TYPE type; - - public String getName() - { - return name; + errorTupleCount++; + return; } - - public void setName(String name) - { - this.name = name; - } - - public FIELD_TYPE getType() - { - return type; - } - - public void setType(String type) - { - this.type = FIELD_TYPE.valueOf(type); + String incomingString = new String(tuple); + if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple")); + } + errorTupleCount++; + return; } + try { + if (parsedOutput.isConnected()) { + csvStringReader.open(incomingString); + Map<String, Object> map = csvMapReader.read(nameMapping, processors); + parsedOutput.emit(map); + parsedOutputCount++; + } - public String getFormat() - { - return format; - } + if (out.isConnected() && clazz != null) { + csvStringReader.open(incomingString); + Object obj = csvBeanReader.read(clazz, nameMapping, processors); + out.emit(obj); + emittedObjectCount++; + } - public void setFormat(String format) - { - this.format = format; + } catch (SuperCsvException | IOException e) { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(incomingString, e.getMessage())); + } + errorTupleCount++; + logger.error("Tuple could not be parsed. Reason {}", e.getMessage()); } - } - /** - * Gets the array list of the fields, a field being a POJO containing the name - * of the field and type of field. - * - * @return An array list of Fields. - */ - public ArrayList<Field> getFields() + @Override + public KeyValPair<String, String> processErrorTuple(byte[] input) { - return fields; + throw new UnsupportedOperationException("Not supported"); } /** - * Sets the array list of the fields, a field being a POJO containing the name - * of the field and type of field. - * - * @param fields - * An array list of Fields. + * Returns array of cellprocessors, one for each field */ - public void setFields(ArrayList<Field> fields) + private CellProcessor[] getProcessor(List<Field> fields) { - this.fields = fields; + CellProcessor[] processor = new CellProcessor[fields.size()]; + int fieldCount = 0; + for (Field field : fields) { + processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints()); + } + return processor; } - /** - * Gets the delimiter which separates fields in incoming data. - * - * @return fieldDelimiter - */ - public int getFieldDelimiter() + @Override + public void teardown() { - return fieldDelimiter; + try { + csvMapReader.close(); + } catch (IOException e) { + logger.error("Error while closing csv map reader {}", e.getMessage()); + DTThrowable.wrapIfChecked(e); + } + try { + csvBeanReader.close(); + } catch (IOException e) { + logger.error("Error while closing csv bean reader {}", e.getMessage()); + DTThrowable.wrapIfChecked(e); + } } /** - * Sets the delimiter which separates fields in incoming data. + * Get the schema * - * @param fieldDelimiter + * @return */ - public void setFieldDelimiter(int fieldDelimiter) + public String getSchema() { - this.fieldDelimiter = fieldDelimiter; + return schema; } /** - * Gets the delimiter which separates lines in incoming data. + * Set the schema * - * @return lineDelimiter + * @param schema */ - public String getLineDelimiter() + public void setSchema(String schema) { - return lineDelimiter; + this.schema = schema; } /** - * Sets the delimiter which separates line in incoming data. + * Get errorTupleCount * - * @param lineDelimiter + * @return errorTupleCount */ - public void setLineDelimiter(String lineDelimiter) + @VisibleForTesting + public long getErrorTupleCount() { - this.lineDelimiter = lineDelimiter; + return errorTupleCount; } /** - * Gets the name of the fields with type and format ( for date ) as comma - * separated string in same order as incoming data. e.g - * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * Get emittedObjectCount * - * @return fieldInfo + * @return emittedObjectCount */ - public String getFieldInfo() + @VisibleForTesting + public long getEmittedObjectCount() { - return fieldInfo; + return emittedObjectCount; } /** - * Sets the name of the fields with type and format ( for date ) as comma - * separated string in same order as incoming data. e.g - * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * Get incomingTuplesCount * - * @param fieldInfo + * @return incomingTuplesCount */ - public void setFieldInfo(String fieldInfo) + @VisibleForTesting + public long getIncomingTuplesCount() { - this.fieldInfo = fieldInfo; + return incomingTuplesCount; } + /** + * output port to emit validate records as map + */ + public final transient DefaultOutputPort<Map<String, Object>> parsedOutput = new DefaultOutputPort<Map<String, Object>>(); private static final Logger logger = LoggerFactory.getLogger(CsvParser.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java new file mode 100644 index 0000000..19f6a4b --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java @@ -0,0 +1,359 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * This is schema that defines fields and their constraints for delimited files + * The operators use this information to validate the incoming tuples. + * Information from JSON schema is saved in this object and is used by the + * operators + * <p> + * <br> + * <br> + * Example schema <br> + * <br> + * {@code{ "separator": ",", "quoteChar":"\"", "fields": [ "name": "adId", + * "type": "Integer", "constraints": "required": "true" } , { "name": "adName", + * "type": "String", "constraints": { "required": "true", "pattern": + * "[a-z].*[a-z]$", "maxLength": "20" } }, { "name": "bidPrice", "type": + * "Double", "constraints": { "required": "true", "minValue": "0.1", "maxValue": + * "3.2" } }, { "name": "startDate", "type": "Date", "constraints": { "format": + * "dd/MM/yyyy" } }, { "name": "securityCode", "type": "Long", "constraints": { + * "minValue": "10", "maxValue": "30" } }, { "name": "active", "type": + * "Boolean", "constraints": { "required": "true" } } ] }} + */ +public class DelimitedSchema +{ + + /** + * JSON key string for separator + */ + private static final String SEPARATOR = "separator"; + /** + * JSON key string for quote character + */ + private static final String QUOTE_CHAR = "quoteChar"; + /** + * JSON key string for line delimiter + */ + private static final String LINE_DELIMITER = "lineDelimiter"; + /** + * JSON key string for fields array + */ + private static final String FIELDS = "fields"; + /** + * JSON key string for name of the field within fields array + */ + private static final String NAME = "name"; + /** + * JSON key string for type of the field within fields array + */ + private static final String TYPE = "type"; + /** + * JSON key string for constraints for each field + */ + private static final String CONSTRAINTS = "constraints"; + /** + * JSON key string for required constraint + */ + public static final String REQUIRED = "required"; + /** + * JSON key string for equals constraint + */ + public static final String EQUALS = "equals"; + /** + * JSON key string for length constraint + */ + public static final String LENGTH = "length"; + /** + * JSON key string for min length constraint + */ + public static final String MIN_LENGTH = "minLength"; + /** + * JSON key string for max length constraint + */ + public static final String MAX_LENGTH = "maxLength"; + /** + * JSON key string for min value constraint + */ + public static final String MIN_VALUE = "minValue"; + /** + * JSON key string for max value constraint + */ + public static final String MAX_VALUE = "maxValue"; + /** + * JSON key string for regex pattern constraint + */ + public static final String REGEX_PATTERN = "pattern"; + /** + * JSON key string for date format constraint + */ + public static final String DATE_FORMAT = "format"; + /** + * JSON key string for locale constraint + */ + public static final String LOCALE = "locale"; + /** + * JSON key string for true value constraint + */ + public static final String TRUE_VALUE = "trueValue"; + /** + * JSON key string for false value constraint + */ + public static final String FALSE_VALUE = "falseValue"; + /** + * delimiter character provided in schema. Default is , + */ + private int delimiterChar = ','; + /** + * quote character provided in schema. Default is " + */ + private char quoteChar = '\"'; + /** + * line delimiter character provided in schema. Default is new line character + */ + private String lineDelimiter = "\r\n"; + /** + * This holds the list of field names in the same order as in the schema + */ + private List<String> fieldNames = new LinkedList<String>(); + /** + * This holds list of {@link Field} + */ + private List<Field> fields = new LinkedList<Field>(); + + /** + * Supported data types + */ + public enum FieldType + { + BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE + }; + + public DelimitedSchema(String json) + { + try { + initialize(json); + } catch (JSONException | IOException e) { + logger.error("{}", e); + throw new IllegalArgumentException(e); + } + } + + /** + * For a given json string, this method sets the field members + * + * @param json + * @throws JSONException + * @throws IOException + */ + private void initialize(String json) throws JSONException, IOException + { + JSONObject jo = new JSONObject(json); + if (jo.has(SEPARATOR)) { + delimiterChar = ((String)jo.getString(SEPARATOR)).charAt(0); + } + if (jo.has(QUOTE_CHAR)) { + quoteChar = ((String)jo.getString(QUOTE_CHAR)).charAt(0); + } + if (jo.has(LINE_DELIMITER)) { + lineDelimiter = (String)jo.getString(LINE_DELIMITER); + } + + JSONArray fieldArray = jo.getJSONArray(FIELDS); + for (int i = 0; i < fieldArray.length(); i++) { + JSONObject obj = fieldArray.getJSONObject(i); + Field field = new Field(obj.getString(NAME), obj.getString(TYPE)); + fields.add(field); + fieldNames.add(field.name); + if (obj.has(CONSTRAINTS)) { + JSONObject constraints = obj.getJSONObject(CONSTRAINTS); + field.constraints = new ObjectMapper().readValue(constraints.toString(), HashMap.class); + } + } + } + + /** + * Get the list of field names mentioned in schema + * + * @return fieldNames + */ + public List<String> getFieldNames() + { + return Collections.unmodifiableList(fieldNames); + } + + /** + * Get the delimiter character + * + * @return delimiterChar + */ + public int getDelimiterChar() + { + return delimiterChar; + } + + /** + * Get the quoteChar + * + * @return quoteChar + */ + public char getQuoteChar() + { + return quoteChar; + } + + /** + * Get the line delimiter + * + * @return lineDelimiter + */ + public String getLineDelimiter() + { + return lineDelimiter; + } + + @Override + public String toString() + { + return "DelimitedSchema [delimiterChar=" + delimiterChar + ", quoteChar=" + quoteChar + ", lineDelimiter=" + + lineDelimiter + ", fieldNames=" + fieldNames + ", fields=" + fields + "]"; + } + + /** + * Get the list of Fields. + * + * @return fields + */ + public List<Field> getFields() + { + return Collections.unmodifiableList(fields); + } + + /** + * Objects of this class represents a particular field in the schema. Each + * field has a name, type and a set of associated constraints. + * + */ + public class Field + { + /** + * name of the field + */ + String name; + /** + * Data type of the field + */ + FieldType type; + /** + * constraints associated with the field + */ + Map<String, Object> constraints = new HashMap<String, Object>(); + + public Field(String name, String type) + { + this.name = name; + this.type = FieldType.valueOf(type.toUpperCase()); + } + + /** + * Get the name of the field + * + * @return name + */ + public String getName() + { + return name; + } + + /** + * Set the name of the field + * + * @param name + */ + public void setName(String name) + { + this.name = name; + } + + /** + * Get {@link FieldType} + * + * @return type + */ + public FieldType getType() + { + return type; + } + + /** + * Set {@link FieldType} + * + * @param type + */ + public void setType(FieldType type) + { + this.type = type; + } + + /** + * Get the map of constraints associated with the field + * + * @return constraints + */ + public Map<String, Object> getConstraints() + { + return constraints; + } + + /** + * Sets the map of constraints associated with the field + * + * @param constraints + */ + public void setConstraints(Map<String, Object> constraints) + { + this.constraints = constraints; + } + + @Override + public String toString() + { + return "Fields [name=" + name + ", type=" + type + ", constraints=" + constraints + "]"; + } + } + + private static final Logger logger = LoggerFactory.getLogger(DelimitedSchema.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java index c9a4179..dc77aa8 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java @@ -20,169 +20,514 @@ package com.datatorrent.contrib.parser; import java.util.Date; -import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; public class CsvPOJOParserTest { - CsvParser operator; - CollectorTestSink<Object> validDataSink; - CollectorTestSink<String> invalidDataSink; + private static final String filename = "schema.json"; + CollectorTestSink<Object> error = new CollectorTestSink<Object>(); + CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>(); + CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>(); + CsvParser parser = new CsvParser(); @Rule public Watcher watcher = new Watcher(); public class Watcher extends TestWatcher { - @Override protected void starting(Description description) { super.starting(description); - operator = new CsvParser(); - operator.setClazz(EmployeeBean.class); - operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); - validDataSink = new CollectorTestSink<Object>(); - invalidDataSink = new CollectorTestSink<String>(); - TestUtils.setSink(operator.out, validDataSink); - TestUtils.setSink(operator.err, invalidDataSink); + parser.setClazz(Ad.class); + parser.setSchema(SchemaUtils.jarResourceFileToString(filename)); + parser.setup(null); + parser.err.setSink(error); + parser.parsedOutput.setSink(objectPort); + parser.out.setSink(pojoPort); } @Override protected void finished(Description description) { super.finished(description); - operator.teardown(); + error.clear(); + objectPort.clear(); + pojoPort.clear(); + parser.teardown(); } - } + /* + * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid + * 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes + * Constraints are defined in schema.json + */ @Test - public void testCsvToPojoWriterDefault() + public void TestParserValidInput() { - operator.setup(null); - String tuple = "john,cs,1,01/01/2015"; - operator.in.process(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); + String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); + Ad adPojo = (Ad)obj; Assert.assertNotNull(obj); - Assert.assertEquals(EmployeeBean.class, obj.getClass()); - EmployeeBean pojo = (EmployeeBean)obj; - Assert.assertEquals("john", pojo.getName()); - Assert.assertEquals("cs", pojo.getDept()); - Assert.assertEquals(1, pojo.getEid()); - Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( - pojo.getDateOfJoining())); + Assert.assertEquals(Ad.class, obj.getClass()); + Assert.assertEquals(1234, adPojo.getAdId()); + Assert.assertTrue("adxyz".equals(adPojo.getAdName())); + Assert.assertEquals(0.2, adPojo.getBidPrice(), 0.0); + Assert.assertEquals(Date.class, adPojo.getStartDate().getClass()); + Assert.assertEquals(Date.class, adPojo.getEndDate().getClass()); + Assert.assertEquals(12, adPojo.getSecurityCode()); + Assert.assertTrue("CAMP_AD".equals(adPojo.getParentCampaign())); + Assert.assertTrue(adPojo.isActive()); + Assert.assertFalse(adPojo.isOptimized()); + Assert.assertTrue("yes".equals(adPojo.getValid())); } @Test - public void testCsvToPojoWriterDateFormat() + public void TestParserValidInputPojoPortNotConnected() { - operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy"); - operator.setup(null); - String tuple = "john,cs,1,01-JAN-2015"; - operator.in.process(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); + parser.out.setSink(null); + String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + } + + @Test + public void TestParserValidInputClassNameNotProvided() + { + parser.setClazz(null); + String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + } + + @Test + public void TestParserInvalidAdIdInput() + { + String input = ",98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserNoCampaignIdInput() + { + String input = "1234,,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); Assert.assertNotNull(obj); - Assert.assertEquals(EmployeeBean.class, obj.getClass()); - EmployeeBean pojo = (EmployeeBean)obj; - Assert.assertEquals("john", pojo.getName()); - Assert.assertEquals("cs", pojo.getDept()); - Assert.assertEquals(1, pojo.getEid()); - Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( - pojo.getDateOfJoining())); + Assert.assertEquals(Ad.class, obj.getClass()); + Assert.assertEquals(0, error.collectedTuples.size()); } @Test - public void testCsvToPojoWriterDateFormatMultiple() + public void TestParserInvalidCampaignIdInput() { - operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date"); - operator.setup(null); - String tuple = "john,cs,1,01-JAN-2015,01/01/2015"; - operator.in.process(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); + String input = "1234,9833,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserInvalidAdNameInput() + { + String input = "1234,98233,adxyz123,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserInvalidBidPriceInput() + { + String input = "1234,98233,adxyz,3.3,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserInvalidStartDateInput() + { + String input = "1234,98233,adxyz,0.2,2015-30-08 02:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserInvalidSecurityCodeInput() + { + String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,85,y,,CAMP_AD,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserInvalidisActiveInput() + { + String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,yo,,CAMP_AD,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserInvalidParentCampaignInput() + { + String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserValidisOptimized() + { + String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes"; + parser.in.process(input.getBytes()); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); Assert.assertNotNull(obj); - Assert.assertEquals(EmployeeBean.class, obj.getClass()); - EmployeeBean pojo = (EmployeeBean)obj; - Assert.assertEquals("john", pojo.getName()); - Assert.assertEquals("cs", pojo.getDept()); - Assert.assertEquals(1, pojo.getEid()); - Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( - pojo.getDateOfJoining())); - Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( - pojo.getDateOfBirth())); + Assert.assertEquals(Ad.class, obj.getClass()); + Assert.assertEquals(0, error.collectedTuples.size()); + } + + @Test + public void TestParserInValidisOptimized() + { + String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP,Y,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserInValidWeatherTargeting() + { + String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,NO,yes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserNullOrBlankInput() + { + parser.beginWindow(0); + parser.in.process(null); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserHeaderAsInput() + { + String input = "adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,active,optimized,parentCampaign,weatherTargeted,valid"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserLessFields() + { + parser.beginWindow(0); + parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION".getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); } - public static class EmployeeBean + @Test + public void TestParserMoreFields() { - private String name; - private String dept; - private int eid; - private Date dateOfJoining; - private Date dateOfBirth; + parser.beginWindow(0); + parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP_AD,Y,yes,ExtraField" + .getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } - public String getName() + @Test + public void TestParserValidInputMetricVerification() + { + parser.beginWindow(0); + Assert.assertEquals(0, parser.parsedOutputCount); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes"; + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, parser.parsedOutputCount); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(1, parser.getEmittedObjectCount()); + } + + @Test + public void TestParserInvalidInputMetricVerification() + { + parser.beginWindow(0); + Assert.assertEquals(0, parser.parsedOutputCount); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP_AD,Y,yes,ExtraField" + .getBytes()); + parser.endWindow(); + Assert.assertEquals(0, parser.parsedOutputCount); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(1, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + } + + @Test + public void TestParserValidInputMetricResetCheck() + { + parser.beginWindow(0); + Assert.assertEquals(0, parser.parsedOutputCount); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes"; + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, parser.parsedOutputCount); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(1, parser.getEmittedObjectCount()); + parser.beginWindow(1); + Assert.assertEquals(0, parser.parsedOutputCount); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + parser.in.process(input.getBytes()); + Assert.assertEquals(1, parser.parsedOutputCount); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(1, parser.getEmittedObjectCount()); + parser.endWindow(); + } + + public static class Ad + { + + private int adId; + private int campaignId; + private String adName; + private double bidPrice; + private Date startDate; + private Date endDate; + private long securityCode; + private boolean active; + private boolean optimized; + private String parentCampaign; + private Character weatherTargeted; + private String valid; + + public Ad() + { + + } + + public int getAdId() + { + return adId; + } + + public void setAdId(int adId) + { + this.adId = adId; + } + + public int getCampaignId() + { + return campaignId; + } + + public void setCampaignId(int campaignId) + { + this.campaignId = campaignId; + } + + public String getAdName() + { + return adName; + } + + public void setAdName(String adName) + { + this.adName = adName; + } + + public double getBidPrice() + { + return bidPrice; + } + + public void setBidPrice(double bidPrice) + { + this.bidPrice = bidPrice; + } + + public Date getStartDate() + { + return startDate; + } + + public void setStartDate(Date startDate) + { + this.startDate = startDate; + } + + public Date getEndDate() { - return name; + return endDate; } - public void setName(String name) + public void setEndDate(Date endDate) { - this.name = name; + this.endDate = endDate; } - public String getDept() + public long getSecurityCode() { - return dept; + return securityCode; } - public void setDept(String dept) + public void setSecurityCode(long securityCode) { - this.dept = dept; + this.securityCode = securityCode; } - public int getEid() + public boolean isActive() { - return eid; + return active; } - public void setEid(int eid) + public void setActive(boolean active) { - this.eid = eid; + this.active = active; } - public Date getDateOfJoining() + public boolean isOptimized() { - return dateOfJoining; + return optimized; } - public void setDateOfJoining(Date dateOfJoining) + public void setOptimized(boolean optimized) { - this.dateOfJoining = dateOfJoining; + this.optimized = optimized; } - public Date getDateOfBirth() + public String getParentCampaign() { - return dateOfBirth; + return parentCampaign; } - public void setDateOfBirth(Date dateOfBirth) + public void setParentCampaign(String parentCampaign) + { + this.parentCampaign = parentCampaign; + } + + public Character getWeatherTargeted() + { + return weatherTargeted; + } + + public void setWeatherTargeted(Character weatherTargeted) + { + this.weatherTargeted = weatherTargeted; + } + + public String getValid() + { + return valid; + } + + public void setValid(String valid) + { + this.valid = valid; + } + + @Override + public String toString() { - this.dateOfBirth = dateOfBirth; + return "Ad [adId=" + adId + ", campaignId=" + campaignId + ", adName=" + adName + ", bidPrice=" + bidPrice + + ", startDate=" + startDate + ", endDate=" + endDate + ", securityCode=" + securityCode + ", active=" + + active + ", optimized=" + optimized + ", parentCampaign=" + parentCampaign + ", weatherTargeted=" + + weatherTargeted + ", valid=" + valid + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/test/resources/schema.json ---------------------------------------------------------------------- diff --git a/contrib/src/test/resources/schema.json b/contrib/src/test/resources/schema.json new file mode 100644 index 0000000..13e2789 --- /dev/null +++ b/contrib/src/test/resources/schema.json @@ -0,0 +1,96 @@ +{ + "separator": ",", + "quoteChar":"\"", + "fields": [ + { + "name": "adId", + "type": "Integer", + "constraints": { + "required": "true" + } + }, + { + "name": "campaignId", + "type": "Integer", + "constraints": { + "equals": "98233" + } + }, + { + "name": "adName", + "type": "String", + "constraints": { + "required": "true", + "pattern": "[a-z].*[a-z]$", + "maxLength": "10" + } + }, + { + "name": "bidPrice", + "type": "Double", + "constraints": { + "required": "true", + "minValue": "0.1", + "maxValue": "3.2" + } + }, + { + "name": "startDate", + "type": "Date", + "constraints": { + "format": "yyyy-MM-dd HH:mm:ss", + "locale":"IN" + } + }, + { + "name": "endDate", + "type": "Date", + "constraints": { + "format": "dd/MM/yyyy" + } + }, + { + "name": "securityCode", + "type": "Long", + "constraints": { + "minValue": "10", + "maxValue": "30" + } + }, + { + "name": "active", + "type": "Boolean", + "constraints": { + "required": "true" + } + }, + { + "name": "optimized", + "type": "Boolean", + "constraints": { + "trueValue":"OPTIMIZE", + "falseValue":"NO_OPTIMIZE" + } + }, + { + "name": "parentCampaign", + "type": "String", + "constraints": { + "required": "true", + "equals": "CAMP_AD" + } + }, + { + "name": "weatherTargeted", + "type": "Character", + "constraints": { + "required": "true", + "equals": "Y" + } + }, + { + "name": "valid", + "type": "String" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java index 9e8ee0f..4e9800a 100644 --- a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java +++ b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java @@ -81,7 +81,7 @@ public class JsonParser extends Parser<String, String> } @Override - public String processErorrTuple(String input) + public String processErrorTuple(String input) { return input; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/library/src/main/java/com/datatorrent/lib/parser/Parser.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/parser/Parser.java b/library/src/main/java/com/datatorrent/lib/parser/Parser.java index 5bcc1c5..4f591f1 100644 --- a/library/src/main/java/com/datatorrent/lib/parser/Parser.java +++ b/library/src/main/java/com/datatorrent/lib/parser/Parser.java @@ -82,7 +82,7 @@ public abstract class Parser<INPUT, ERROROUT> extends BaseOperator implements Co Object tuple = convert(inputTuple); if (tuple == null && err.isConnected()) { errorTupleCount++; - err.emit(processErorrTuple(inputTuple)); + err.emit(processErrorTuple(inputTuple)); return; } if (out.isConnected()) { @@ -91,7 +91,7 @@ public abstract class Parser<INPUT, ERROROUT> extends BaseOperator implements Co } } - public abstract ERROROUT processErorrTuple(INPUT input); + public abstract ERROROUT processErrorTuple(INPUT input); @Override public void beginWindow(long windowId) http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java index 3d416e1..c8eeacc 100644 --- a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java +++ b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java @@ -123,7 +123,7 @@ public class XmlParser extends Parser<String, String> } @Override - public String processErorrTuple(String input) + public String processErrorTuple(String input) { return input; }
