Repository: apex-malhar Updated Branches: refs/heads/master 16b15c21b -> 0852c6594
APEXMALHAR-2259 Code changes to add fixed width parser Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0852c659 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0852c659 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0852c659 Branch: refs/heads/master Commit: 0852c659400a2bd484a72d4539cdbaced1c5239f Parents: 16b15c2 Author: Hitesh-Scorpio <[email protected]> Authored: Fri Sep 23 11:43:16 2016 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Tue Jan 10 16:30:50 2017 +0530 ---------------------------------------------------------------------- .../contrib/formatter/CsvFormatter.java | 2 +- .../contrib/parser/CellProcessorBuilder.java | 2 +- .../contrib/parser/DelimitedSchema.java | 131 +---- .../contrib/parser/FixedWidthParser.java | 493 +++++++++++++++++ .../contrib/parser/FixedWidthSchema.java | 379 +++++++++++++ .../com/datatorrent/contrib/parser/Schema.java | 176 +++++++ .../contrib/parser/FixedWidthTest.java | 526 +++++++++++++++++++ .../src/test/resources/FixedWidthSchema.json | 78 +++ 8 files changed, 1670 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java index 2979b44..2bd0e67 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java +++ b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java @@ -40,7 +40,7 @@ import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; import com.datatorrent.contrib.parser.DelimitedSchema; import com.datatorrent.contrib.parser.DelimitedSchema.Field; -import com.datatorrent.contrib.parser.DelimitedSchema.FieldType; +import com.datatorrent.contrib.parser.Schema.FieldType; import com.datatorrent.lib.formatter.Formatter; import com.datatorrent.netlet.util.DTThrowable; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java index e7840aa..292031e 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java @@ -40,7 +40,7 @@ import org.supercsv.util.CsvContext; import org.apache.commons.lang3.StringUtils; -import com.datatorrent.contrib.parser.DelimitedSchema.FieldType; +import com.datatorrent.contrib.parser.Schema.FieldType; /** * Helper class with methods to generate CellProcessor objects. Cell processors http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java index 29b2c92..a47e138 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java @@ -52,40 +52,9 @@ import org.slf4j.LoggerFactory; * "dd/MM/yyyy" } }, { "name": "securityCode", "type": "Long", "constraints": { * "minValue": "10", "maxValue": "30" } }, { "name": "active", "type": * "Boolean", "constraints": { "required": "true" } } ] }} - * - * @since 3.4.0 */ -public class DelimitedSchema +public class DelimitedSchema extends Schema { - - /** - * JSON key string for separator - */ - private static final String SEPARATOR = "separator"; - /** - * JSON key string for quote character - */ - private static final String QUOTE_CHAR = "quoteChar"; - /** - * JSON key string for line delimiter - */ - private static final String LINE_DELIMITER = "lineDelimiter"; - /** - * JSON key string for fields array - */ - private static final String FIELDS = "fields"; - /** - * JSON key string for name of the field within fields array - */ - private static final String NAME = "name"; - /** - * JSON key string for type of the field within fields array - */ - private static final String TYPE = "type"; - /** - * JSON key string for constraints for each field - */ - private static final String CONSTRAINTS = "constraints"; /** * JSON key string for required constraint */ @@ -119,21 +88,26 @@ public class DelimitedSchema */ public static final String REGEX_PATTERN = "pattern"; /** - * JSON key string for date format constraint - */ - public static final String DATE_FORMAT = "format"; - /** * JSON key string for locale constraint */ public static final String LOCALE = "locale"; /** - * JSON key string for true value constraint + * JSON key string for separator + */ + private static final String SEPARATOR = "separator"; + /** + * JSON key string for quote character + */ + private static final String QUOTE_CHAR = "quoteChar"; + /** + * JSON key string for line delimiter */ - public static final String TRUE_VALUE = "trueValue"; + private static final String LINE_DELIMITER = "lineDelimiter"; /** - * JSON key string for false value constraint + * JSON key string for constraints for each field */ - public static final String FALSE_VALUE = "falseValue"; + private static final String CONSTRAINTS = "constraints"; + private static final Logger logger = LoggerFactory.getLogger(DelimitedSchema.class); /** * delimiter character provided in schema. Default is , */ @@ -147,22 +121,10 @@ public class DelimitedSchema */ private String lineDelimiter = "\r\n"; /** - * This holds the list of field names in the same order as in the schema - */ - private List<String> fieldNames = new LinkedList<String>(); - /** * This holds list of {@link Field} */ private List<Field> fields = new LinkedList<Field>(); - /** - * Supported data types - */ - public enum FieldType - { - BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; - public DelimitedSchema(String json) { try { @@ -207,16 +169,6 @@ public class DelimitedSchema } /** - * Get the list of field names mentioned in schema - * - * @return fieldNames - */ - public List<String> getFieldNames() - { - return Collections.unmodifiableList(fieldNames); - } - - /** * Get the delimiter character * * @return delimiterChar @@ -268,65 +220,16 @@ public class DelimitedSchema * field has a name, type and a set of associated constraints. * */ - public class Field + public class Field extends Schema.Field { /** - * name of the field - */ - String name; - /** - * Data type of the field - */ - FieldType type; - /** * constraints associated with the field */ Map<String, Object> constraints = new HashMap<String, Object>(); public Field(String name, String type) { - this.name = name; - this.type = FieldType.valueOf(type.toUpperCase()); - } - - /** - * Get the name of the field - * - * @return name - */ - public String getName() - { - return name; - } - - /** - * Set the name of the field - * - * @param name - */ - public void setName(String name) - { - this.name = name; - } - - /** - * Get {@link FieldType} - * - * @return type - */ - public FieldType getType() - { - return type; - } - - /** - * Set {@link FieldType} - * - * @param type - */ - public void setType(FieldType type) - { - this.type = type; + super(name, type); } /** @@ -356,6 +259,4 @@ public class DelimitedSchema } } - private static final Logger logger = LoggerFactory.getLogger(DelimitedSchema.class); - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java new file mode 100644 index 0000000..9ee556e --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java @@ -0,0 +1,493 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.contrib.parser; + +import java.lang.reflect.Field; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.ClassUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.univocity.parsers.fixed.FieldAlignment; +import com.univocity.parsers.fixed.FixedWidthFields; +import com.univocity.parsers.fixed.FixedWidthParserSettings; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.parser.Parser; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Operator that parses a fixed width record against a specified schema <br> + * Schema is specified in a json format as per {@link FixedWidthSchema} that + * contains field information for each field.<br> + * Assumption is that each field in the data should map to a simple + * java type.<br> + * <br> + * <b>Properties</b> <br> + * <b>schema</b>:schema as a string<br> + * <b>clazz</b>:Pojo class <br> + * <b>Ports</b> <br> + * <b>in</b>:input tuple as a byte array. Each tuple represents a record<br> + * <b>parsedOutput</b>:tuples that are validated against the schema are emitted + * as HashMap<String,Object> on this port<br> + * Key being the name of the field and Val being the value of the field. + * <b>out</b>:tuples that are validated against the schema are emitted as pojo + * on this port<br> + * <b>err</b>:tuples that do not confine to schema are emitted on this port as + * KeyValPair<String,String><br> + * Key being the tuple and Val being the reason. + * + * @displayName FixedWidthParser + * @category Parsers + * @tags fixedwidth pojo parser + */ +public class FixedWidthParser extends Parser<byte[], KeyValPair<String, String>> implements Operator.ActivationListener<Context> +{ + private static final Logger logger = LoggerFactory.getLogger(FixedWidthParser.class); + public final transient DefaultOutputPort<HashMap<String, Object>> parsedOutput = new DefaultOutputPort<HashMap<String, Object>>(); + /** + * Metric to keep count of number of tuples emitted on {@link #parsedOutput} + * port + */ + @AutoMetric + private long parsedOutputCount; + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link FixedWidthSchema} + */ + @NotNull + private String jsonSchema; + /** + * Total length of the record + */ + private int recordLength; + /** + * Schema is read into this object to access fields + */ + private transient FixedWidthSchema schema; + /** + * List of setters to set the value in POJO to be emitted + */ + private transient List<FixedWidthParser.TypeInfo> setters; + /** + * header- This will be string of field names, padded with padding character (if required) + */ + private transient String header; + /** + * Univocity Parser to parse the input tuples + */ + private com.univocity.parsers.fixed.FixedWidthParser univocityParser; + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + parsedOutputCount = 0; + } + + @Override + public void processTuple(byte[] tuple) + { + if (tuple == null) { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(null, "Blank/null tuple")); + logger.error("Tuple could not be parsed. Reason Blank/null tuple"); + } + errorTupleCount++; + return; + } + String incomingString = new String(tuple); + if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, getHeader())) { + if (err.isConnected()) { + err.emit(new KeyValPair<>(incomingString, "Blank/header tuple")); + logger.error("Tuple could not be parsed. Reason Blank/header tuple"); + } + errorTupleCount++; + return; + } + if (incomingString.length() < recordLength) { + if (err.isConnected()) { + err.emit(new KeyValPair<>(incomingString, "Record length mis-match/shorter tuple")); + } + logger.error("Tuple could not be parsed. Reason Record length mis-match/shorter tuple. " + + "Expected length " + recordLength + " Actual length " + incomingString.length()); + errorTupleCount++; + return; + } + if (incomingString.length() > recordLength) { + if (err.isConnected()) { + err.emit(new KeyValPair<>(incomingString, "Record length mis-match/longer tuple")); + } + logger.error("Tuple could not be parsed. Reason Record length mis-match/longer tuple. " + + "Expected length " + recordLength + " Actual length " + incomingString.length()); + errorTupleCount++; + return; + } + try { + String[] values = univocityParser.parseLine(incomingString); + HashMap<String, Object> toEmit = new HashMap(); + Object pojo = validateAndSet(values, toEmit); + if (parsedOutput.isConnected()) { + parsedOutput.emit(toEmit); + parsedOutputCount++; + } + if (out.isConnected() && clazz != null) { + out.emit(pojo); + emittedObjectCount++; + } + } catch (Exception e) { + if (err.isConnected()) { + err.emit(new KeyValPair<>(incomingString, e.getMessage())); + } + errorTupleCount++; + logger.error("Tuple could not be parsed. Reason {}", e.getMessage()); + } + } + + @Override + public KeyValPair<String, String> processErrorTuple(byte[] input) + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public Object convert(byte[] tuple) + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public void setup(Context.OperatorContext context) + { + try { + schema = new FixedWidthSchema(jsonSchema); + recordLength = 0; + List<FixedWidthSchema.Field> fields = schema.getFields(); + for (int i = 0; i < fields.size(); i++) { + recordLength += fields.get(i).getFieldLength(); + } + createUnivocityParser(); + } catch (Exception e) { + logger.error("Cannot setup Parser Reason {}", e.getMessage()); + throw e; + } + } + + /** + * Activate the Parser + */ + @Override + public void activate(Context context) + { + try { + if (clazz != null) { + setters = new ArrayList<>(); + List<String> fieldNames = schema.getFieldNames(); + if (fieldNames != null) { + for (String fieldName : fieldNames) { + addSetter(fieldName); + } + } + } + } catch (Exception e) { + logger.error("Cannot activate Parser Reason {}", e.getMessage()); + throw e; + } + } + + /** + * Function to create a univocity Parser + */ + private void createUnivocityParser() + { + List<FixedWidthSchema.Field> fields = schema.getFields(); + FixedWidthFields fieldWidthFields = new FixedWidthFields(); + + for (int i = 0; i < fields.size(); i++) { + FixedWidthSchema.Field currentField = fields.get(i); + int fieldLength = currentField.getFieldLength(); + FieldAlignment currentFieldAlignment; + + if (currentField.getAlignment().equalsIgnoreCase("centre")) { + currentFieldAlignment = FieldAlignment.CENTER; + } else if (currentField.getAlignment().equalsIgnoreCase("left")) { + currentFieldAlignment = FieldAlignment.LEFT; + } else { + currentFieldAlignment = FieldAlignment.RIGHT; + } + fieldWidthFields.addField(currentField.getName(), fieldLength, currentFieldAlignment, currentField.getPadding()); + } + FixedWidthParserSettings settings = new FixedWidthParserSettings(fieldWidthFields); + univocityParser = new com.univocity.parsers.fixed.FixedWidthParser(settings); + } + + @Override + public void deactivate() + { + + } + + /** + * Function to add a setter for a field and add it + * to the List of setters + * + * @param fieldName name of the field for which setter is to be added + */ + private void addSetter(String fieldName) + { + try { + Field f = clazz.getDeclaredField(fieldName); + FixedWidthParser.TypeInfo t = new FixedWidthParser.TypeInfo(f.getName(), + ClassUtils.primitiveToWrapper(f.getType())); + t.setter = PojoUtils.createSetter(clazz, t.name, t.type); + setters.add(t); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Field " + fieldName + " not found in class " + clazz, e); + } catch (Exception e) { + throw new RuntimeException("Exception while adding a setter" + e.getMessage(), e); + } + } + + /** + * Function to validate individual parsed values and set the objects to be emitted + * @param values array of String containing individual parsed values + * @param toEmit the map to be emitted + * @return POJO the object to be returned (if the tuple class is set) + */ + private Object validateAndSet(String[] values, HashMap toEmit) + { + Object pojoObject = null; + try { + List<FixedWidthSchema.Field> fields = schema.getFields(); + try { + if (clazz != null) { + pojoObject = clazz.newInstance(); + } + } catch (InstantiationException ie) { + throw new RuntimeException("Exception in instantiating", ie); + } + for (int i = 0; i < fields.size(); i++) { + FixedWidthSchema.Field currentField = fields.get(i); + FixedWidthParser.TypeInfo typeInfo = setters.get(i); + validateAndSetCurrentField(currentField, + values[i], typeInfo, pojoObject, toEmit); + } + } catch (StringIndexOutOfBoundsException e) { + throw new RuntimeException("Record length and tuple length mismatch ", e); + } catch (IllegalAccessException ie) { + throw new RuntimeException("Illegal Access ", ie); + } catch (Exception e) { + throw new RuntimeException("Exception in validation", e); + } + return pojoObject; + } + + /** + * Function to validate and set the current field. + * @param currentField the field which is to be validated and set + * @param value the parsed value of the field + * @param typeInfo information about the field in POJO + * @param pojoObject POJO which is to be set + * @param toEmit the map to be emitted + */ + private void validateAndSetCurrentField(FixedWidthSchema.Field currentField, + String value, FixedWidthParser.TypeInfo typeInfo, Object pojoObject, HashMap toEmit) + { + try { + String fieldName = currentField.getName(); + if (value != null && !value.isEmpty()) { + Object result; + switch (currentField.getType()) { + case INTEGER: + result = Integer.parseInt(value); + break; + case DOUBLE: + result = Double.parseDouble(value); + break; + case STRING: + result = value; + break; + case CHARACTER: + result = value.charAt(0); + break; + case FLOAT: + result = Float.parseFloat(value); + break; + case LONG: + result = Long.parseLong(value); + break; + case SHORT: + result = Short.parseShort(value); + break; + case BOOLEAN: + if (value.compareToIgnoreCase(currentField.getTrueValue()) == 0) { + result = Boolean.parseBoolean("true"); + } else if (value.compareToIgnoreCase(currentField.getFalseValue()) == 0) { + result = Boolean.parseBoolean("false"); + } else { + throw new NumberFormatException(); + } + break; + case DATE: + DateFormat df = new SimpleDateFormat(currentField.getDateFormat()); + df.setLenient(false); + result = df.parse(value); + break; + default: + throw new RuntimeException("Invalid Type in Field", new Exception()); + } + toEmit.put(fieldName,result); + if (typeInfo != null && pojoObject != null) { + typeInfo.setter.set(pojoObject, result); + } + } else { + toEmit.put(fieldName,value); + } + } catch (NumberFormatException e) { + throw new RuntimeException("Error parsing" + value + " to Integer type", e); + } catch (ParseException e) { + throw new RuntimeException("Error parsing" + value, e); + }catch (Exception e) { + throw new RuntimeException("Error setting " + value + " in the given class" + typeInfo.toString(), e); + } + } + + /** + * Get the schema + * + * @return the Json schema + */ + public String getJsonSchema() + { + return jsonSchema; + } + + /** + * Set the jsonSchema + * + * @param jsonSchema schema to be set. + */ + public void setJsonSchema(String jsonSchema) + { + this.jsonSchema = jsonSchema; + } + + /** + * Get the header + * + * @return header- This will be string of field names, padded with padding character (if required) + */ + public String getHeader() + { + return header; + } + + /** + * Set the header + * + * @param header- This will be string of field names, padded with padding character (if required) + */ + public void setHeader(String header) + { + this.header = header; + } + + /** + * Get errorTupleCount + * + * @return errorTupleCount number of erroneous tuples. + */ + @VisibleForTesting + public long getErrorTupleCount() + { + return errorTupleCount; + } + + /** + * Get emittedObjectCount + * + * @return emittedObjectCount count of objects emitted. + */ + @VisibleForTesting + public long getEmittedObjectCount() + { + return emittedObjectCount; + } + + /** + * Get incomingTuplesCount + * + * @return incomingTuplesCount number of incoming tuples. + */ + @VisibleForTesting + public long getIncomingTuplesCount() + { + return incomingTuplesCount; + } + + /** + * Get parsedOutputCount + * + * @return parsedOutPutCount count of well parsed tuples. + */ + @VisibleForTesting + public long getParsedOutputCount() + { + return parsedOutputCount; + } + + /** + * Objects of this class represents a particular data member of the Class to be emitted. + * Each data member has a name, type and a accessor(setter) function associated with it. + */ + static class TypeInfo + { + String name; + Class type; + PojoUtils.Setter setter; + + public TypeInfo(String name, Class<?> type) + { + this.name = name; + this.type = type; + } + + public String toString() + { + return "'name': " + name + " 'type': " + type; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java new file mode 100644 index 0000000..e64125b --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.contrib.parser; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * This is schema that defines fields and their constraints for fixed width files + * The operators use this information to validate the incoming tuples. + * Information from JSON schema is saved in this object and is used by the + * operators + * <p> + * <br> + * <br> + * Example schema <br> + * <br> + * {{ "padding": " ","alignment"="left","fields": [ {"name": "adId", + * "type": "Integer","padding":"0", "length": 3} , { "name": "adName", + * "type": "String", "alignment": "right","fieldLength": 20}, { "name": "bidPrice", "type": + * "Double", "length": 5}, { "name": "startDate", "type": "Date", "length": 10, + * "format":"dd/MM/yyyy" }, { "name": "securityCode", "type": "Long","length": 10 }, + * { "name": "active", "type":"Boolean","length": 2} ] }} + */ +public class FixedWidthSchema extends Schema +{ + /** + * JSON key string for record length + */ + public static final String FIELD_LENGTH = "length"; + /** + * JSON key string for Padding Character + */ + public static final String FIELD_PADDING_CHARACTER = "padding"; + /** + * Default Padding Character + */ + public static final char DEFAULT_PADDING_CHARACTER = ' '; + /** + * Default Alignment + */ + public static final String DEFAULT_ALIGNMENT= "left"; + /** + * JSON key string for Field Alignment + */ + public static final String FIELD_ALIGNMENT ="alignment"; + + public static final Logger logger = LoggerFactory.getLogger(FixedWidthSchema.class); + /** + * This holds list of {@link Field} + */ + private List<Field> fields = new LinkedList<>(); + /** + * This holds the padding character for the entire file + */ + private char globalPadding; + /** + * This holds the global alignment + */ + private String globalAlignment; + + /** + * Constructor for FixedWidthSchema + */ + public FixedWidthSchema(String json) + { + try { + initialize(json); + } catch (JSONException | IOException e) { + logger.error("{}", e); + throw new IllegalArgumentException(e); + } + } + + /** + * Get the Padding character + * @return the padding character for the entire file + */ + public char getGlobalPadding() + { + return globalPadding; + } + + /** + * Set the padding character + * @param padding the padding character for the entire file + */ + public void setGlobalPadding(char padding) + { + this.globalPadding = padding; + } + + /** + * Get the global alignment + * @return globalAlignment the global alignment for the entire file. + */ + public String getGlobalAlignment() + { + return globalAlignment; + } + + /** + * Set the global alignment + * @param globalAlignment the global alignment for the entire file + */ + public void setGlobalAlignment(String globalAlignment) + { + this.globalAlignment = globalAlignment; + } + + /** + * For a given json string, this method sets the field members + * + * @param json schema as provided by the user. + */ + private void initialize(String json) throws JSONException, IOException + { + + JSONObject jo = new JSONObject(json); + JSONArray fieldArray = jo.getJSONArray(FIELDS); + if (jo.has(FIELD_PADDING_CHARACTER)) { + globalPadding = jo.getString(FIELD_PADDING_CHARACTER).charAt(0); + } else { + globalPadding = DEFAULT_PADDING_CHARACTER; + } + if (jo.has(FIELD_ALIGNMENT)) { + globalAlignment = jo.getString(FIELD_ALIGNMENT); + } else { + globalAlignment = DEFAULT_ALIGNMENT; + } + + for (int i = 0; i < fieldArray.length(); i++) { + JSONObject obj = fieldArray.getJSONObject(i); + Field field = new Field(obj.getString(NAME), + obj.getString(TYPE).toUpperCase(), obj.getInt(FIELD_LENGTH)); + if(obj.has(FIELD_PADDING_CHARACTER)) { + field.setPadding(obj.getString(FIELD_PADDING_CHARACTER).charAt(0)); + } else { + field.setPadding(globalPadding); + } + if(obj.has(FIELD_ALIGNMENT)) { + field.setAlignment(obj.getString(FIELD_ALIGNMENT)); + } else { + field.setAlignment(globalAlignment); + } + //Get the format if the given data type is Date + if (field.getType() == FieldType.DATE) { + if (obj.has(DATE_FORMAT)) { + field.setDateFormat(obj.getString(DATE_FORMAT)); + } else { + field.setDateFormat(DEFAULT_DATE_FORMAT); + } + + } + //Get the trueValue and falseValue if the data type is Boolean + if (field.getType() == FieldType.BOOLEAN) { + if (obj.has(TRUE_VALUE)) { + field.setTrueValue(obj.getString(TRUE_VALUE)); + } else { + field.setTrueValue(DEFAULT_TRUE_VALUE); + } + if (obj.has(FALSE_VALUE)) { + field.setFalseValue(obj.getString(FALSE_VALUE)); + } else { + field.setFalseValue(DEFAULT_FALSE_VALUE); + } + + } + fields.add(field); + fieldNames.add(field.name); + } + } + + /** + * Get the list of Fields. + * + * @return fields list of {@link Field} + */ + public List<Field> getFields() + { + return Collections.unmodifiableList(fields); + } + + /** + * Objects of this class represents a particular field in the schema. Each + * field has a name, type and a fieldLength. + * In case of type Date we need a dateFormat. + * + */ + public class Field extends Schema.Field + { + /** + * Length of the field + */ + private int fieldLength; + /** + * Parameter to specify format of date + */ + private String dateFormat; + /** + * Parameter to specify true value of Boolean + */ + private String trueValue; + /** + * Parameter to specify false value of Boolean + */ + private String falseValue; + /** + * Parameter to specify padding + */ + private char padding; + /** + * Parameter to specify alignment + */ + private String alignment; + + /** + * Constructor for Field + * @param name - name of the field. + * @param type - type of the field. + * @param fieldLength - length of the field. + */ + public Field(String name, String type, Integer fieldLength) + { + super(name, type); + this.fieldLength = fieldLength; + this.dateFormat = DEFAULT_DATE_FORMAT; + this.trueValue = DEFAULT_TRUE_VALUE; + this.falseValue = DEFAULT_FALSE_VALUE; + this.padding=' '; + this.alignment=DEFAULT_ALIGNMENT; + } + + /** + * Get the Length of the Field + * @return fieldLength length of the field. + */ + public int getFieldLength() + { + return fieldLength; + } + + /** + * Set the end pointer of the field + * + * @param fieldLength length of the field. + */ + public void setFieldLength(Integer fieldLength) + { + this.fieldLength = fieldLength; + } + + /** + * Get the dateFormat of the field + * + * @return dateFormat format of date given. + */ + public String getDateFormat() + { + return dateFormat; + } + + /** + * Set the the dateFormat of the field + * + * @param dateFormat sets the format of date. + */ + public void setDateFormat(String dateFormat) + { + this.dateFormat = dateFormat; + } + + /** + * Get the trueValue of the Boolean field + * @return trueValue gets the equivalent true value. + */ + public String getTrueValue() + { + return trueValue; + } + + /** + * Set the trueValue of the Boolean field + * + * @param trueValue sets the equivalent true value. + */ + public void setTrueValue(String trueValue) + { + this.trueValue = trueValue; + } + + /** + * Get the falseValue of the Boolean field + * @return falseValue gets the equivalent false value. + */ + public String getFalseValue() + { + return falseValue; + } + + /** + * Set the end pointer of the field + * + * @param falseValue sets the equivalent false value. + */ + public void setFalseValue(String falseValue) + { + this.falseValue = falseValue; + } + /** + * Get the field padding + * @return padding gets the padding for the individual field. + */ + public char getPadding() + { + return padding; + } + + /** + * Set the field padding + * @param padding sets the padding for the individual field. + */ + public void setPadding(char padding) + { + this.padding = padding; + } + + /** + * Get the field alignment + * @return alignment gets the alignment for the individual field. + */ + public String getAlignment() + { + return alignment; + } + + /** + * Set the field alignment + * @param alignment sets the alignment for the individual field. + */ + public void setAlignment(String alignment) + { + this.alignment = alignment; + } + + @Override + public String toString() + { + return "Fields [name=" + name + ", type=" + type + " fieldLength= " + fieldLength + "]"; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java new file mode 100644 index 0000000..c09ff92 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.contrib.parser; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +/** + * <p> + * This is schema that defines fields and their constraints for delimited and fixedWidth files + * The operators use this information to validate the incoming tuples. + * Information from JSON schema is saved in this object and is used by the + * operators + */ +public class Schema +{ + /** + * JSON key string for fields array + */ + public static final String FIELDS = "fields"; + /** + * JSON key string for name of the field within fields array + */ + public static final String NAME = "name"; + /** + * JSON key string for type of the field within fields array + */ + public static final String TYPE = "type"; + /** + * JSON key string for date format constraint + */ + public static final String DATE_FORMAT = "format"; + /** + * JSON key string for true value constraint + */ + public static final String TRUE_VALUE = "trueValue"; + /** + * JSON key string for false value constraint + */ + public static final String FALSE_VALUE = "falseValue"; + /** + * JSON key string for default true value of boolean + */ + public static final String DEFAULT_TRUE_VALUE = "true"; + /** + * JSON key string for default true value of boolean + */ + public static final String DEFAULT_FALSE_VALUE = "false"; + /** + * Default date format + */ + public static final String DEFAULT_DATE_FORMAT = "dd/mm/yy"; + /** + * This holds the list of field names in the same order as in the schema + */ + protected List<String> fieldNames = new LinkedList<String>(); + + + /** + * Get the list of field names mentioned in schema + * + * @return fieldNames + */ + public List<String> getFieldNames() + { + return Collections.unmodifiableList(fieldNames); + } + + /** + * Supported data types + */ + public enum FieldType + { + BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE + } + + /** + * Objects of this class represents a particular field in the schema. Each + * field has a name, type and a set of associated constraints. + * + */ + public class Field + { + /** + * name of the field + */ + String name; + /** + * Data type of the field + */ + FieldType type; + + /** + * Parameterized Constructor + * @param name name of the field. + * @param type data type of the field. + */ + public Field(String name, String type) + { + this.name = name; + this.type = FieldType.valueOf(type.toUpperCase()); + } + + /** + * Default Constructor + */ + public Field() + { + } + + /** + * Get the name of the field + * + * @return name + */ + public String getName() + { + return name; + } + + /** + * Set the name of the field + * + * @param name + */ + public void setName(String name) + { + this.name = name; + } + + /** + * Get {@link FieldType} + * + * @return type + */ + public FieldType getType() + { + return type; + } + + /** + * Set {@link FieldType} + * + * @param type + */ + public void setType(FieldType type) + { + this.type = type; + } + + @Override + public String toString() + { + return "Fields [name=" + name + ", type=" + type + "]"; + } + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java new file mode 100644 index 0000000..bc72efd --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java @@ -0,0 +1,526 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import java.util.Date; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyValPair; + +public class FixedWidthTest +{ + + private static final String filename = "FixedWidthSchema.json"; + @Rule + public Watcher watcher = new Watcher(); + CollectorTestSink<Object> error = new CollectorTestSink<Object>(); + CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>(); + CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>(); + FixedWidthParser parser = new FixedWidthParser(); + + /* + * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid + * e.g. in csv 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes + * 120982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes + * Constraints are defined in FixedWidthSchema.json + */ + @Test + public void TestParserValidInput() + { + String input = "120982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); + Ad adPojo = (Ad)obj; + Assert.assertNotNull(obj); + Assert.assertEquals(Ad.class, obj.getClass()); + + Assert.assertEquals(12, adPojo.getAdId()); + Assert.assertTrue("adxyz".equals(adPojo.getAdName())); + Assert.assertEquals(0.2, adPojo.getBidPrice(), 0.0); + Assert.assertEquals(Date.class, adPojo.getStartDate().getClass()); + Assert.assertEquals(Date.class, adPojo.getEndDate().getClass()); + Assert.assertEquals(12, adPojo.getSecurityCode()); + Assert.assertTrue("CAMP_AD123".equals(adPojo.getParentCampaign())); + Assert.assertTrue("yes".equals(adPojo.getValid())); + Assert.assertTrue(adPojo.isActive()); + Assert.assertFalse(adPojo.isOptimized()); + } + + @Test + public void TestParserValidInputPojoPortNotConnected() + { + parser.out.setSink(null); + String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + } + + @Test + public void TestParserValidInputClassNameNotProvided() + { + parser.setClazz(null); + String input = "120982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + } + + @Test + public void TestParserInvalidAdIdInput() + { + String input = "1c2982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(input, errorTuple.getKey()); + } + + @Test + public void TestParserNoCampaignIdInput() + { + String input = "123 _____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Ad.class, obj.getClass()); + Assert.assertEquals(0, error.collectedTuples.size()); + } + + @Test + public void TestParserInvalidCampaignIdInput() + { + String input = "1239c2_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(input, errorTuple.getKey()); + } + + @Test + public void TestParserInvalidBidPriceInput() + { + String input = "123982_____adxyz0..2015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(input, errorTuple.getKey()); + } + + @Test + public void TestParserInvalidStartDateInput() + { + String input = "123982_____adxyz0.22015-31-13 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(input, errorTuple.getKey()); + } + + @Test + public void TestParserInvalidSecurityCodeInput() + { + String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212b__y_CAMP_AD123Yyes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(input, errorTuple.getKey()); + } + + @Test + public void TestParserInvalidisActiveInput() + { + String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___p_CAMP_AD123Yyes"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(input, errorTuple.getKey()); + } + + @Test + public void TestParserNullOrBlankInput() + { + parser.beginWindow(0); + parser.in.process(null); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestParserHeaderAsInput() + { + parser.setHeader("aIdcIdadName bidstartDate endDate sCodeBBparentCampWvalid "); + String input = "aIdcIdadName bidstartDate endDate sCodeBBparentCampWvalid "; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(input, errorTuple.getKey()); + } + + @Test + public void TestParserShorterRecord() + { + String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(input, errorTuple.getKey()); + } + + @Test + public void TestParserShorterRecordOnlyPOJOPortConnected() + { + String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.parsedOutput.setSink(null); + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(input, errorTuple.getKey()); + } + + @Test + public void TestParserLongerRecord() + { + + String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes Extra_stuff"; + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(input, errorTuple.getKey()); + } + + @Test + public void TestParserLongerRecordOnlyPOJOPortConnected() + { + String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes Extra_stuff"; + parser.parsedOutput.setSink(null); + parser.beginWindow(0); + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(input, errorTuple.getKey()); + } + + @Test + public void TestParserValidInputMetricVerification() + { + parser.beginWindow(0); + Assert.assertEquals(0, parser.getParsedOutputCount()); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, parser.getParsedOutputCount()); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(1, parser.getEmittedObjectCount()); + } + + @Test + public void TestParserInvalidInputMetricVerification() + { + parser.beginWindow(0); + Assert.assertEquals(0, parser.getParsedOutputCount()); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + parser.in.process("123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes Extra_stuff" + .getBytes()); + parser.endWindow(); + Assert.assertEquals(0, parser.getParsedOutputCount()); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(1, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + } + + @Test + public void TestParserValidInputMetricResetCheck() + { + parser.beginWindow(0); + Assert.assertEquals(0, parser.getParsedOutputCount()); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes "; + parser.in.process(input.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, parser.getParsedOutputCount()); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(1, parser.getEmittedObjectCount()); + parser.beginWindow(1); + Assert.assertEquals(0, parser.getParsedOutputCount()); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + parser.in.process(input.getBytes()); + Assert.assertEquals(1, parser.getParsedOutputCount()); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(1, parser.getEmittedObjectCount()); + parser.endWindow(); + } + + public static class Ad + { + + private int adId; + private int campaignId; + private String adName; + private double bidPrice; + private Date startDate; + private Date endDate; + private long securityCode; + private boolean active; + private boolean optimized; + private String parentCampaign; + private Character weatherTargeted; + private String valid; + + public Ad() + { + + } + + public int getAdId() + { + return adId; + } + + public void setAdId(int adId) + { + this.adId = adId; + } + + public int getCampaignId() + { + return campaignId; + } + + public void setCampaignId(int campaignId) + { + this.campaignId = campaignId; + } + + public String getAdName() + { + return adName; + } + + public void setAdName(String adName) + { + this.adName = adName; + } + + public double getBidPrice() + { + return bidPrice; + } + + public void setBidPrice(double bidPrice) + { + this.bidPrice = bidPrice; + } + + public Date getStartDate() + { + return startDate; + } + + public void setStartDate(Date startDate) + { + this.startDate = startDate; + } + + public Date getEndDate() + { + return endDate; + } + + public void setEndDate(Date endDate) + { + this.endDate = endDate; + } + + public long getSecurityCode() + { + return securityCode; + } + + public void setSecurityCode(long securityCode) + { + this.securityCode = securityCode; + } + + public boolean isActive() + { + return active; + } + + public void setActive(boolean active) + { + this.active = active; + } + + public boolean isOptimized() + { + return optimized; + } + + public void setOptimized(boolean optimized) + { + this.optimized = optimized; + } + + public String getParentCampaign() + { + return parentCampaign; + } + + public void setParentCampaign(String parentCampaign) + { + this.parentCampaign = parentCampaign; + } + + public Character getWeatherTargeted() + { + return weatherTargeted; + } + + public void setWeatherTargeted(Character weatherTargeted) + { + this.weatherTargeted = weatherTargeted; + } + + public String getValid() + { + return valid; + } + + public void setValid(String valid) + { + this.valid = valid; + } + + @Override + public String toString() + { + return "Ad [adId=" + adId + ", campaignId=" + campaignId + ", adName=" + adName + ", bidPrice=" + bidPrice + + ", startDate=" + startDate + ", endDate=" + endDate + ", securityCode=" + securityCode + ", active=" + + active + ", optimized=" + optimized + ", parentCampaign=" + parentCampaign + ", weatherTargeted=" + + weatherTargeted + ", valid=" + valid + "]"; + } + } + + public class Watcher extends TestWatcher + { + @Override + protected void starting(Description description) + { + super.starting(description); + parser.setClazz(Ad.class); + parser.setJsonSchema(SchemaUtils.jarResourceFileToString(filename)); + parser.setup(null); + parser.activate(null); + parser.err.setSink(error); + parser.parsedOutput.setSink(objectPort); + parser.out.setSink(pojoPort); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + error.clear(); + objectPort.clear(); + pojoPort.clear(); + parser.teardown(); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/test/resources/FixedWidthSchema.json ---------------------------------------------------------------------- diff --git a/contrib/src/test/resources/FixedWidthSchema.json b/contrib/src/test/resources/FixedWidthSchema.json new file mode 100644 index 0000000..38d7cc7 --- /dev/null +++ b/contrib/src/test/resources/FixedWidthSchema.json @@ -0,0 +1,78 @@ +{ + "padding": "_", + "alignment": "left", + "fields": [ + { + "name": "adId", + "type": "Integer", + "length": "3", + "padding": "0" + }, + { + "name": "campaignId", + "type": "Integer", + "length": "3", + "padding": " " + + }, + { + "name": "adName", + "type": "String", + "length": "10", + "alignment":"right" + }, + { + "name": "bidPrice", + "type": "Double", + "length": "3" + }, + { + "name": "startDate", + "type": "Date", + "format": "yyyy-MM-dd HH:mm:ss", + "length": "19" + + }, + { + "name": "endDate", + "type": "Date", + "format": "dd/MM/yyyy", + "length": "10" + }, + { + "name": "securityCode", + "type": "Long", + "length": "5" + }, + { + "name": "active", + "type": "Boolean", + "length": "1", + "trueValue": "y", + "falseValue": "n" + }, + { + "name": "optimized", + "type": "Boolean", + "length": "1", + "trueValue": "y", + "falseValue": "n" + }, + { + "name": "parentCampaign", + "type": "String", + "length": "10" + }, + { + "name": "weatherTargeted", + "type": "Character", + "length": "1" + }, + { + "name": "valid", + "type": "String", + "length": "10", + "padding":" " + } + ] +}
