Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 e1a45507b -> 3f4fe1866
MLHR-1838 Added pojo parsers and formatters(csv,json,xml) Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/3f4fe186 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/3f4fe186 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/3f4fe186 Branch: refs/heads/devel-3 Commit: 3f4fe18665c59dadb8ad289f696df983bdc451ce Parents: e1a4550 Author: shubham <[email protected]> Authored: Fri Sep 11 16:26:03 2015 +0530 Committer: shubham <[email protected]> Committed: Wed Oct 14 10:53:12 2015 +0530 ---------------------------------------------------------------------- contrib/pom.xml | 12 + .../contrib/converter/Converter.java | 43 +++ .../contrib/schema/formatter/CsvFormatter.java | 285 +++++++++++++++++ .../contrib/schema/formatter/Formatter.java | 101 ++++++ .../contrib/schema/formatter/JsonFormatter.java | 109 +++++++ .../contrib/schema/formatter/XmlFormatter.java | 172 ++++++++++ .../contrib/schema/parser/CsvParser.java | 314 +++++++++++++++++++ .../contrib/schema/parser/JsonParser.java | 106 +++++++ .../contrib/schema/parser/Parser.java | 102 ++++++ .../contrib/schema/parser/XmlParser.java | 141 +++++++++ .../schema/formatter/CsvFormatterTest.java | 147 +++++++++ .../schema/formatter/JsonFormatterTest.java | 186 +++++++++++ .../schema/formatter/XmlFormatterTest.java | 226 +++++++++++++ .../contrib/schema/parser/CsvParserTest.java | 172 ++++++++++ .../contrib/schema/parser/JsonParserTest.java | 212 +++++++++++++ .../contrib/schema/parser/XmlParserTest.java | 254 +++++++++++++++ 16 files changed, 2582 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index abed040..91ef5c7 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -606,5 +606,17 @@ <version>${dt.framework.version}</version> <type>jar</type> </dependency> + <dependency> + <!-- required by Xml parser and formatter --> + <groupId>com.thoughtworks.xstream</groupId> + <artifactId>xstream</artifactId> + <version>1.4.8</version> + </dependency> + <dependency> + <!-- required by Csv parser and formatter --> + <groupId>net.sf.supercsv</groupId> + <artifactId>super-csv-joda</artifactId> + <version>2.3.1</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java new file mode 100644 index 0000000..ebf2925 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.converter; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Operators that are converting tuples from one format to another must + * implement this interface. Eg. Parsers or formatters , that parse data of + * certain format and convert them to another format. + * + * @param <INPUT> + * @param <OUTPUT> + */ [email protected] +public interface Converter<INPUT, OUTPUT> +{ + /** + * Provide the implementation for converting tuples from one format to the + * other + * + * @param INPUT + * tuple of certain format + * @return OUTPUT tuple of converted format + */ + public OUTPUT convert(INPUT tuple); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java new file mode 100644 index 0000000..924acc6 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.schema.formatter; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.supercsv.cellprocessor.FmtDate; +import org.supercsv.cellprocessor.Optional; +import org.supercsv.cellprocessor.ift.CellProcessor; +import org.supercsv.exception.SuperCsvException; +import org.supercsv.io.CsvBeanWriter; +import org.supercsv.io.ICsvBeanWriter; +import org.supercsv.prefs.CsvPreference; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Operator that converts POJO to CSV string <br> + * Assumption is that each field in the delimited data should map to a simple + * java type.<br> + * <br> + * <b>Properties</b> <br> + * <b>fieldInfo</b>:User need to specify fields and their types as a comma + * separated string having format <NAME>:<TYPE>|<FORMAT> in + * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as + * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * + * @displayName CsvFormatter + * @category Formatter + * @tags pojo csv formatter + */ [email protected] +public class CsvFormatter extends Formatter<String> +{ + + private ArrayList<Field> fields; + @NotNull + protected String classname; + @NotNull + protected int fieldDelimiter; + protected String lineDelimiter; + + @NotNull + protected String fieldInfo; + + public enum FIELD_TYPE + { + BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE + }; + + protected transient String[] nameMapping; + protected transient CellProcessor[] processors; + protected transient CsvPreference preference; + + public CsvFormatter() + { + fields = new ArrayList<Field>(); + fieldDelimiter = ','; + lineDelimiter = "\r\n"; + + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + + //fieldInfo information + fields = new ArrayList<Field>(); + String[] fieldInfoTuple = fieldInfo.split(","); + for (int i = 0; i < fieldInfoTuple.length; i++) { + String[] fieldTuple = fieldInfoTuple[i].split(":"); + Field field = new Field(); + field.setName(fieldTuple[0]); + String[] typeFormat = fieldTuple[1].split("\\|"); + field.setType(typeFormat[0].toUpperCase()); + if (typeFormat.length > 1) { + field.setFormat(typeFormat[1]); + } + getFields().add(field); + } + preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); + int countKeyValue = getFields().size(); + nameMapping = new String[countKeyValue]; + processors = new CellProcessor[countKeyValue]; + initialise(nameMapping, processors); + + } + + private void initialise(String[] nameMapping, CellProcessor[] processors) + { + for (int i = 0; i < getFields().size(); i++) { + FIELD_TYPE type = getFields().get(i).type; + nameMapping[i] = getFields().get(i).name; + if (type == FIELD_TYPE.DATE) { + String dateFormat = getFields().get(i).format; + processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat)); + } else { + processors[i] = new Optional(); + } + } + + } + + @Override + public void activate(Context context) + { + + } + + @Override + public void deactivate() + { + + } + + @Override + public String convert(Object tuple) + { + try { + StringWriter stringWriter = new StringWriter(); + ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference); + beanWriter.write(tuple, nameMapping, processors); + beanWriter.flush(); + beanWriter.close(); + return stringWriter.toString(); + } catch (SuperCsvException e) { + logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + return null; + } + + public static class Field + { + String name; + String format; + FIELD_TYPE type; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public FIELD_TYPE getType() + { + return type; + } + + public void setType(String type) + { + this.type = FIELD_TYPE.valueOf(type); + } + + public String getFormat() + { + return format; + } + + public void setFormat(String format) + { + this.format = format; + } + } + + /** + * Gets the array list of the fields, a field being a POJO containing the name + * of the field and type of field. + * + * @return An array list of Fields. + */ + public ArrayList<Field> getFields() + { + return fields; + } + + /** + * Sets the array list of the fields, a field being a POJO containing the name + * of the field and type of field. + * + * @param fields + * An array list of Fields. + */ + public void setFields(ArrayList<Field> fields) + { + this.fields = fields; + } + + /** + * Gets the delimiter which separates fields in incoming data. + * + * @return fieldDelimiter + */ + public int getFieldDelimiter() + { + return fieldDelimiter; + } + + /** + * Sets the delimiter which separates fields in incoming data. + * + * @param fieldDelimiter + */ + public void setFieldDelimiter(int fieldDelimiter) + { + this.fieldDelimiter = fieldDelimiter; + } + + /** + * Gets the delimiter which separates lines in incoming data. + * + * @return lineDelimiter + */ + public String getLineDelimiter() + { + return lineDelimiter; + } + + /** + * Sets the delimiter which separates line in incoming data. + * + * @param lineDelimiter + */ + public void setLineDelimiter(String lineDelimiter) + { + this.lineDelimiter = lineDelimiter; + } + + /** + * Gets the name of the fields with type and format in data as comma separated + * string in same order as incoming data. e.g + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * + * @return fieldInfo + */ + public String getFieldInfo() + { + return fieldInfo; + } + + /** + * Sets the name of the fields with type and format in data as comma separated + * string in same order as incoming data. e.g + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * + * @param fieldInfo + */ + public void setFieldInfo(String fieldInfo) + { + this.fieldInfo = fieldInfo; + } + + private static final Logger logger = LoggerFactory.getLogger(CsvFormatter.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java new file mode 100644 index 0000000..19a78e0 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.schema.formatter; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.contrib.converter.Converter; + +/** + * Abstract class that implements Converter interface. This is a schema enabled + * Formatter <br> + * Sub classes need to implement the convert method <br> + * <b>Port Interface</b><br> + * <b>in</b>: expects <Object> this is a schema enabled port<br> + * <b>out</b>: emits <OUTPUT> <br> + * <b>err</b>: emits <Object> error port that emits input tuple that could + * not be converted<br> + * <br> + * + * @displayName Parser + * @tags parser converter + * @param <INPUT> + */ [email protected] +public abstract class Formatter<OUTPUT> extends BaseOperator implements Converter<Object, OUTPUT>, + ActivationListener<Context> +{ + protected transient Class<?> clazz; + + @OutputPortFieldAnnotation + public transient DefaultOutputPort<OUTPUT> out = new DefaultOutputPort<OUTPUT>(); + + @OutputPortFieldAnnotation(optional = true) + public transient DefaultOutputPort<Object> err = new DefaultOutputPort<Object>(); + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort<Object> in = new DefaultInputPort<Object>() + { + public void setup(PortContext context) + { + clazz = context.getValue(Context.PortContext.TUPLE_CLASS); + } + + @Override + public void process(Object inputTuple) + { + OUTPUT tuple = convert(inputTuple); + if (tuple == null && err.isConnected()) { + err.emit(inputTuple); + return; + } + if (out.isConnected()) { + out.emit(tuple); + } + } + }; + + /** + * Get the class that needs to be formatted + * + * @return Class<?> + */ + public Class<?> getClazz() + { + return clazz; + } + + /** + * Set the class of tuple that needs to be formatted + * + * @param clazz + */ + public void setClazz(Class<?> clazz) + { + this.clazz = clazz; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java new file mode 100644 index 0000000..344ac60 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.schema.formatter; + +import java.io.IOException; +import java.text.SimpleDateFormat; + +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectWriter; +import org.codehaus.jackson.map.SerializationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Operator that converts POJO to JSON string <br> + * <b>Properties</b> <br> + * <b>dateFormat</b>: date format e.g dd/MM/yyyy + * + * @displayName JsonFormatter + * @category Formatter + * @tags pojo json formatter + */ [email protected] +public class JsonFormatter extends Formatter<String> +{ + private transient ObjectWriter writer; + protected String dateFormat; + + @Override + public void activate(Context context) + { + try { + ObjectMapper mapper = new ObjectMapper(); + if (dateFormat != null) { + mapper.setDateFormat(new SimpleDateFormat(dateFormat)); + } + writer = mapper.writerWithType(clazz); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, true); + } catch (Throwable e) { + throw new RuntimeException("Unable find provided class"); + } + } + + @Override + public void deactivate() + { + + } + + @Override + public String convert(Object tuple) + { + try { + return writer.writeValueAsString(tuple); + } catch (JsonGenerationException | JsonMappingException e) { + logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + return null; + } + + /** + * Get the date format + * + * @return Date format string + */ + public String getDateFormat() + { + return dateFormat; + } + + /** + * Set the date format + * + * @param dateFormat + */ + public void setDateFormat(String dateFormat) + { + this.dateFormat = dateFormat; + } + + private static final Logger logger = LoggerFactory.getLogger(JsonFormatter.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java new file mode 100644 index 0000000..b387031 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.schema.formatter; + +import java.io.Writer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.XStreamException; +import com.thoughtworks.xstream.converters.basic.DateConverter; +import com.thoughtworks.xstream.io.HierarchicalStreamWriter; +import com.thoughtworks.xstream.io.xml.CompactWriter; +import com.thoughtworks.xstream.io.xml.XppDriver; + +/** + * @displayName XmlParser + * @category Formatter + * @tags xml pojo formatter + */ [email protected] +public class XmlFormatter extends Formatter<String> +{ + + private transient XStream xstream; + + protected String alias; + protected String dateFormat; + protected boolean prettyPrint; + + public XmlFormatter() + { + alias = null; + dateFormat = null; + } + + @Override + public void activate(Context context) + { + if (prettyPrint) { + xstream = new XStream(); + } else { + xstream = new XStream(new XppDriver() + { + @Override + public HierarchicalStreamWriter createWriter(Writer out) + { + return new CompactWriter(out, getNameCoder()); + } + }); + } + if (alias != null) { + try { + xstream.alias(alias, clazz); + } catch (Throwable e) { + throw new RuntimeException("Unable find provided class"); + } + } + if (dateFormat != null) { + xstream.registerConverter(new DateConverter(dateFormat, new String[] {})); + } + } + + @Override + public void deactivate() + { + + } + + @Override + public String convert(Object tuple) + { + try { + return xstream.toXML(tuple); + } catch (XStreamException e) { + logger.debug("Error while converting tuple {} {} ",tuple,e.getMessage()); + return null; + } + } + + /** + * Gets the alias This is an optional step. Without it XStream would work + * fine, but the XML element names would contain the fully qualified name of + * each class (including package) which would bulk up the XML a bit. + * + * @return alias. + */ + public String getAlias() + { + return alias; + } + + /** + * Sets the alias This is an optional step. Without it XStream would work + * fine, but the XML element names would contain the fully qualified name of + * each class (including package) which would bulk up the XML a bit. + * + * @param alias + * . + */ + public void setAlias(String alias) + { + this.alias = alias; + } + + /** + * Gets the date format e.g dd/mm/yyyy - this will be how a date would be + * formatted + * + * @return dateFormat. + */ + public String getDateFormat() + { + return dateFormat; + } + + /** + * Sets the date format e.g dd/mm/yyyy - this will be how a date would be + * formatted + * + * @param dateFormat + * . + */ + public void setDateFormat(String dateFormat) + { + this.dateFormat = dateFormat; + } + + /** + * Returns true if pretty print is enabled. + * + * @return prettyPrint + */ + public boolean isPrettyPrint() + { + return prettyPrint; + } + + /** + * Sets pretty print option. + * + * @param prettyPrint + */ + public void setPrettyPrint(boolean prettyPrint) + { + this.prettyPrint = prettyPrint; + } + + private static final Logger logger = LoggerFactory.getLogger(XmlFormatter.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java new file mode 100644 index 0000000..4fd39fb --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.schema.parser; + +import java.io.IOException; +import java.util.ArrayList; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.supercsv.cellprocessor.Optional; +import org.supercsv.cellprocessor.ParseBool; +import org.supercsv.cellprocessor.ParseChar; +import org.supercsv.cellprocessor.ParseDate; +import org.supercsv.cellprocessor.ParseDouble; +import org.supercsv.cellprocessor.ParseInt; +import org.supercsv.cellprocessor.ParseLong; +import org.supercsv.cellprocessor.ift.CellProcessor; +import org.supercsv.io.CsvBeanReader; +import org.supercsv.prefs.CsvPreference; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.util.ReusableStringReader; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Operator that converts CSV string to Pojo <br> + * Assumption is that each field in the delimited data should map to a simple + * java type.<br> + * <br> + * <b>Properties</b> <br> + * <b>fieldInfo</b>:User need to specify fields and their types as a comma + * separated string having format <NAME>:<TYPE>|<FORMAT> in + * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as + * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br> + * <b>fieldDelimiter</b>: Default is comma <br> + * <b>lineDelimiter</b>: Default is '\r\n' + * + * @displayName CsvParser + * @category Parsers + * @tags csv pojo parser + */ [email protected] +public class CsvParser extends Parser<String> +{ + + private ArrayList<Field> fields; + @NotNull + protected int fieldDelimiter; + protected String lineDelimiter; + + @NotNull + protected String fieldInfo; + + protected transient String[] nameMapping; + protected transient CellProcessor[] processors; + private transient CsvBeanReader csvReader; + + public enum FIELD_TYPE + { + BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE + }; + + @NotNull + private transient ReusableStringReader csvStringReader = new ReusableStringReader(); + + public CsvParser() + { + fields = new ArrayList<Field>(); + fieldDelimiter = ','; + lineDelimiter = "\r\n"; + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + + logger.info("field info {}", fieldInfo); + fields = new ArrayList<Field>(); + String[] fieldInfoTuple = fieldInfo.split(","); + for (int i = 0; i < fieldInfoTuple.length; i++) { + String[] fieldTuple = fieldInfoTuple[i].split(":"); + Field field = new Field(); + field.setName(fieldTuple[0]); + String[] typeFormat = fieldTuple[1].split("\\|"); + field.setType(typeFormat[0].toUpperCase()); + if (typeFormat.length > 1) { + field.setFormat(typeFormat[1]); + } + getFields().add(field); + } + + CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); + csvReader = new CsvBeanReader(csvStringReader, preference); + int countKeyValue = getFields().size(); + logger.info("countKeyValue {}", countKeyValue); + nameMapping = new String[countKeyValue]; + processors = new CellProcessor[countKeyValue]; + initialise(nameMapping, processors); + } + + private void initialise(String[] nameMapping, CellProcessor[] processors) + { + for (int i = 0; i < getFields().size(); i++) { + FIELD_TYPE type = getFields().get(i).type; + nameMapping[i] = getFields().get(i).name; + if (type == FIELD_TYPE.DOUBLE) { + processors[i] = new Optional(new ParseDouble()); + } else if (type == FIELD_TYPE.INTEGER) { + processors[i] = new Optional(new ParseInt()); + } else if (type == FIELD_TYPE.FLOAT) { + processors[i] = new Optional(new ParseDouble()); + } else if (type == FIELD_TYPE.LONG) { + processors[i] = new Optional(new ParseLong()); + } else if (type == FIELD_TYPE.SHORT) { + processors[i] = new Optional(new ParseInt()); + } else if (type == FIELD_TYPE.STRING) { + processors[i] = new Optional(); + } else if (type == FIELD_TYPE.CHARACTER) { + processors[i] = new Optional(new ParseChar()); + } else if (type == FIELD_TYPE.BOOLEAN) { + processors[i] = new Optional(new ParseBool()); + } else if (type == FIELD_TYPE.DATE) { + String dateFormat = getFields().get(i).format; + processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat)); + } + } + } + + @Override + public void activate(Context context) + { + + } + + @Override + public void deactivate() + { + + } + + @Override + public Object convert(String tuple) + { + try { + csvStringReader.open(tuple); + return csvReader.read(clazz, nameMapping, processors); + } catch (IOException e) { + logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); + return null; + } + } + + @Override + public void teardown() + { + try { + if (csvReader != null) { + csvReader.close(); + } + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + + public static class Field + { + String name; + String format; + FIELD_TYPE type; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public FIELD_TYPE getType() + { + return type; + } + + public void setType(String type) + { + this.type = FIELD_TYPE.valueOf(type); + } + + public String getFormat() + { + return format; + } + + public void setFormat(String format) + { + this.format = format; + } + + } + + /** + * Gets the array list of the fields, a field being a POJO containing the name + * of the field and type of field. + * + * @return An array list of Fields. + */ + public ArrayList<Field> getFields() + { + return fields; + } + + /** + * Sets the array list of the fields, a field being a POJO containing the name + * of the field and type of field. + * + * @param fields + * An array list of Fields. + */ + public void setFields(ArrayList<Field> fields) + { + this.fields = fields; + } + + /** + * Gets the delimiter which separates fields in incoming data. + * + * @return fieldDelimiter + */ + public int getFieldDelimiter() + { + return fieldDelimiter; + } + + /** + * Sets the delimiter which separates fields in incoming data. + * + * @param fieldDelimiter + */ + public void setFieldDelimiter(int fieldDelimiter) + { + this.fieldDelimiter = fieldDelimiter; + } + + /** + * Gets the delimiter which separates lines in incoming data. + * + * @return lineDelimiter + */ + public String getLineDelimiter() + { + return lineDelimiter; + } + + /** + * Sets the delimiter which separates line in incoming data. + * + * @param lineDelimiter + */ + public void setLineDelimiter(String lineDelimiter) + { + this.lineDelimiter = lineDelimiter; + } + + /** + * Gets the name of the fields with type and format ( for date ) as comma + * separated string in same order as incoming data. e.g + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * + * @return fieldInfo + */ + public String getFieldInfo() + { + return fieldInfo; + } + + /** + * Sets the name of the fields with type and format ( for date ) as comma + * separated string in same order as incoming data. e.g + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * + * @param fieldInfo + */ + public void setFieldInfo(String fieldInfo) + { + this.fieldInfo = fieldInfo; + } + + private static final Logger logger = LoggerFactory.getLogger(CsvParser.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java new file mode 100644 index 0000000..db45b33 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.schema.parser; + +import java.io.IOException; +import java.text.SimpleDateFormat; + +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Operator that converts JSON string to Pojo <br> + * <b>Properties</b> <br> + * <b>dateFormat</b>: date format e.g dd/MM/yyyy + * + * @displayName JsonParser + * @category Parsers + * @tags json pojo parser + */ [email protected] +public class JsonParser extends Parser<String> +{ + + private transient ObjectReader reader; + protected String dateFormat; + + @Override + public void activate(Context context) + { + try { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + if (dateFormat != null) { + mapper.setDateFormat(new SimpleDateFormat(dateFormat)); + } + reader = mapper.reader(clazz); + } catch (Throwable e) { + throw new RuntimeException("Unable find provided class"); + } + } + + @Override + public void deactivate() + { + } + + @Override + public Object convert(String tuple) + { + try { + return reader.readValue(tuple); + } catch (JsonProcessingException e) { + logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + return null; + } + + /** + * Get the date format + * + * @return Date format string + */ + public String getDateFormat() + { + return dateFormat; + } + + /** + * Set the date format + * + * @param dateFormat + */ + public void setDateFormat(String dateFormat) + { + this.dateFormat = dateFormat; + } + + private static final Logger logger = LoggerFactory.getLogger(JsonParser.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java new file mode 100644 index 0000000..e5ff7f5 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.schema.parser; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.contrib.converter.Converter; + +/** + * Abstract class that implements Converter interface. This is a schema enabled + * Parser <br> + * Sub classes need to implement the convert method <br> + * <br> + * <b>Port Interface</b><br> + * <b>in</b>: expects <INPUT><br> + * <b>out</b>: emits <Object> this is a schema enabled port<br> + * <b>err</b>: emits <INPUT> error port that emits input tuple that could + * not be converted<br> + * <br> + * + * @displayName Parser + * @tags parser converter + * @param <INPUT> + */ [email protected] +public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>, + ActivationListener<Context> +{ + protected transient Class<?> clazz; + + @OutputPortFieldAnnotation(schemaRequired = true) + public transient DefaultOutputPort<Object> out = new DefaultOutputPort<Object>() + { + public void setup(PortContext context) + { + clazz = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + + @OutputPortFieldAnnotation(optional = true) + public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>(); + + public transient DefaultInputPort<INPUT> in = new DefaultInputPort<INPUT>() + { + @Override + public void process(INPUT inputTuple) + { + Object tuple = convert(inputTuple); + if (tuple == null && err.isConnected()) { + err.emit(inputTuple); + return; + } + if (out.isConnected()) { + out.emit(tuple); + } + } + }; + + /** + * Get the class that needs to be formatted + * + * @return Class<?> + */ + public Class<?> getClazz() + { + return clazz; + } + + /** + * Set the class of tuple that needs to be formatted + * + * @param clazz + */ + public void setClazz(Class<?> clazz) + { + this.clazz = clazz; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java new file mode 100644 index 0000000..4931497 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.schema.parser; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.XStreamException; +import com.thoughtworks.xstream.converters.basic.DateConverter; + +import com.datatorrent.api.Context; + +/** + * Operator that converts XML string to Pojo <br> + * <b>Properties</b> <br> + * <b>alias</b>:This maps to the root element of the XML string. If not + * specified, parser would expect the root element to be fully qualified name of + * the Pojo Class. <br> + * <b>dateFormats</b>: Comma separated string of date formats e.g + * dd/mm/yyyy,dd-mmm-yyyy where first one would be considered default + * + * @displayName XmlParser + * @category Parsers + * @tags xml pojo parser + */ [email protected] +public class XmlParser extends Parser<String> +{ + + private transient XStream xstream; + protected String alias; + protected String dateFormats; + + public XmlParser() + { + alias = null; + dateFormats = null; + } + + @Override + public void activate(Context context) + { + xstream = new XStream(); + if (alias != null) { + try { + xstream.alias(alias, clazz); + } catch (Throwable e) { + throw new RuntimeException("Unable find provided class"); + } + } + if (dateFormats != null) { + String[] dateFormat = dateFormats.split(","); + xstream.registerConverter(new DateConverter(dateFormat[0], dateFormat)); + } + } + + @Override + public void deactivate() + { + + } + + @Override + public Object convert(String tuple) + { + try { + return xstream.fromXML(tuple); + } catch (XStreamException e) { + logger.debug("Error while converting tuple {} {}", tuple,e.getMessage()); + return null; + } + } + + /** + * Gets the alias + * + * @return alias. + */ + public String getAlias() + { + return alias; + } + + /** + * Sets the alias This maps to the root element of the XML string. If not + * specified, parser would expect the root element to be fully qualified name + * of the Pojo Class. + * + * @param alias + * . + */ + public void setAlias(String alias) + { + this.alias = alias; + } + + /** + * Gets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy + * where first one would be considered default + * + * @return dateFormats. + */ + public String getDateFormats() + { + return dateFormats; + } + + /** + * Sets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy + * where first one would be considered default + * + * @param dateFormats + * . + */ + public void setDateFormats(String dateFormats) + { + this.dateFormats = dateFormats; + } + + private static final Logger logger = LoggerFactory.getLogger(XmlParser.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java new file mode 100644 index 0000000..8ecc088 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java @@ -0,0 +1,147 @@ +package com.datatorrent.contrib.schema.formatter; + +import java.util.Date; + +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.contrib.schema.formatter.CsvFormatter; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class CsvFormatterTest +{ + + CsvFormatter operator; + CollectorTestSink<Object> validDataSink; + CollectorTestSink<String> invalidDataSink; + + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + + @Override + protected void starting(Description description) + { + super.starting(description); + operator = new CsvFormatter(); + operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); + operator.setLineDelimiter("\r\n"); + validDataSink = new CollectorTestSink<Object>(); + invalidDataSink = new CollectorTestSink<String>(); + TestUtils.setSink(operator.out, validDataSink); + TestUtils.setSink(operator.err, invalidDataSink); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + operator.teardown(); + } + + } + + @Test + public void testPojoReaderToCsv() + { + operator.setup(null); + EmployeeBean emp = new EmployeeBean(); + emp.setName("john"); + emp.setDept("cs"); + emp.setEid(1); + emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); + operator.in.process(emp); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String csvOp = (String)validDataSink.collectedTuples.get(0); + Assert.assertNotNull(csvOp); + Assert.assertEquals("john,cs,1,01/01/2015" + operator.getLineDelimiter(), csvOp); + } + + @Test + public void testPojoReaderToCsvMultipleDate() + { + operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date,dateOfBirth:date|dd-MMM-yyyy"); + operator.setup(null); + EmployeeBean emp = new EmployeeBean(); + emp.setName("john"); + emp.setDept("cs"); + emp.setEid(1); + emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); + emp.setDateOfBirth(new DateTime().withDate(2015, 1, 1).toDate()); + operator.in.process(emp); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String csvOp = (String)validDataSink.collectedTuples.get(0); + Assert.assertNotNull(csvOp); + Assert.assertEquals("john,cs,1,01/01/2015,01-Jan-2015" + operator.getLineDelimiter(), csvOp); + } + + public static class EmployeeBean + { + + private String name; + private String dept; + private int eid; + private Date dateOfJoining; + private Date dateOfBirth; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public String getDept() + { + return dept; + } + + public void setDept(String dept) + { + this.dept = dept; + } + + public int getEid() + { + return eid; + } + + public void setEid(int eid) + { + this.eid = eid; + } + + public Date getDateOfJoining() + { + return dateOfJoining; + } + + public void setDateOfJoining(Date dateOfJoining) + { + this.dateOfJoining = dateOfJoining; + } + + public Date getDateOfBirth() + { + return dateOfBirth; + } + + public void setDateOfBirth(Date dateOfBirth) + { + this.dateOfBirth = dateOfBirth; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java new file mode 100644 index 0000000..4040c63 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java @@ -0,0 +1,186 @@ +package com.datatorrent.contrib.schema.formatter; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Date; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.Description; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.lib.util.TestUtils.TestInfo; +import com.google.common.collect.Lists; + +public class JsonFormatterTest +{ + JsonFormatter operator; + CollectorTestSink<Object> validDataSink; + CollectorTestSink<String> invalidDataSink; + + final ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + + public JsonFormatterTest() + { + // So that the output is cleaner. + System.setErr(new PrintStream(myOut)); + } + + @Rule + public TestInfo testMeta = new FSTestWatcher() + { + private void deleteDirectory() + { + try { + FileUtils.deleteDirectory(new File(getDir())); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + protected void starting(Description descriptor) + { + super.starting(descriptor); + deleteDirectory(); + + operator = new JsonFormatter(); + + validDataSink = new CollectorTestSink<Object>(); + invalidDataSink = new CollectorTestSink<String>(); + TestUtils.setSink(operator.out, validDataSink); + TestUtils.setSink(operator.err, invalidDataSink); + operator.setup(null); + operator.activate(null); + + operator.beginWindow(0); + } + + @Override + protected void finished(Description description) + { + operator.endWindow(); + operator.teardown(); + + deleteDirectory(); + super.finished(description); + } + }; + + @Test + public void testJSONToPOJO() + { + Test1Pojo pojo = new Test1Pojo(); + pojo.a = 123; + pojo.b = 234876274; + pojo.c = "HowAreYou?"; + pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); + + operator.in.put(pojo); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":null}"; + Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJODate() + { + Test1Pojo pojo = new Test1Pojo(); + pojo.a = 123; + pojo.b = 234876274; + pojo.c = "HowAreYou?"; + pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); + pojo.date = new DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate(); + operator.setDateFormat("dd-MM-yyyy"); + operator.setup(null); + operator.activate(null); + operator.in.put(pojo); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; + Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJONullFields() + { + Test1Pojo pojo = new Test1Pojo(); + pojo.a = 123; + pojo.b = 234876274; + pojo.c = "HowAreYou?"; + pojo.d = null; + + operator.in.put(pojo); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":null,\"date\":null}"; + Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJOEmptyPOJO() + { + Test1Pojo pojo = new Test1Pojo(); + operator.in.put(pojo); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expectedJSONString = "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}"; + System.out.println(validDataSink.collectedTuples.get(0)); + Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJONullPOJO() + { + operator.in.put(null); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expectedJSONString = "null"; + Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJONoFieldPOJO() + { + operator.endWindow(); + operator.teardown(); + operator.setClazz(Test2Pojo.class); + operator.setup(null); + operator.beginWindow(1); + + Test2Pojo o = new Test2Pojo(); + operator.in.put(o); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(o, invalidDataSink.collectedTuples.get(0)); + } + + public static class Test1Pojo + { + public int a; + public long b; + public String c; + public List<String> d; + public Date date; + + @Override + public String toString() + { + return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]"; + } + } + + public static class Test2Pojo + { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java new file mode 100644 index 0000000..2bc1aec --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java @@ -0,0 +1,226 @@ +package com.datatorrent.contrib.schema.formatter; + +import java.util.Date; + +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.contrib.schema.formatter.XmlFormatter; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class XmlFormatterTest +{ + + XmlFormatter operator; + CollectorTestSink<Object> validDataSink; + CollectorTestSink<String> invalidDataSink; + + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + + @Override + protected void starting(Description description) + { + super.starting(description); + operator = new XmlFormatter(); + operator.setClazz(EmployeeBean.class); + operator.setDateFormat("yyyy-MM-dd"); + validDataSink = new CollectorTestSink<Object>(); + invalidDataSink = new CollectorTestSink<String>(); + TestUtils.setSink(operator.out, validDataSink); + TestUtils.setSink(operator.err, invalidDataSink); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + operator.teardown(); + } + + } + + @Test + public void testPojoToXmlWithoutAlias() + { + EmployeeBean e = new EmployeeBean(); + e.setName("john"); + e.setEid(1); + e.setDept("cs"); + e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); + + operator.setup(null); + operator.activate(null); + operator.in.process(e); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>" + + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"; + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testXmlToPojoWithAlias() + { + EmployeeBean e = new EmployeeBean(); + e.setName("john"); + e.setEid(1); + e.setDept("cs"); + e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); + + operator.setAlias("EmployeeBean"); + operator.setup(null); + operator.activate(null); + operator.in.process(e); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expected = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>" + + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>"; + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testXmlToPojoWithPrettyPrint() + { + EmployeeBean e = new EmployeeBean(); + e.setName("john"); + e.setEid(1); + e.setDept("cs"); + e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); + + operator.setAlias("EmployeeBean"); + operator.setPrettyPrint(true); + operator.setup(null); + operator.activate(null); + operator.in.process(e); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expected = "<EmployeeBean>\n" + " <name>john</name>\n" + " <dept>cs</dept>\n" + " <eid>1</eid>\n" + + " <dateOfJoining>2015-01-01</dateOfJoining>\n" + "</EmployeeBean>"; + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testPojoToXmlWithoutAliasHeirarchical() + { + EmployeeBean e = new EmployeeBean(); + e.setName("john"); + e.setEid(1); + e.setDept("cs"); + e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); + Address address = new Address(); + address.setCity("new york"); + address.setCountry("US"); + e.setAddress(address); + + operator.setup(null); + operator.activate(null); + operator.in.process(e); + System.out.println(validDataSink.collectedTuples.get(0)); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>" + + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>" + + "<city>new york</city>" + "<country>US</country>" + "</address>" + + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"; + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); + } + + public static class EmployeeBean + { + + private String name; + private String dept; + private int eid; + private Date dateOfJoining; + private Address address; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public String getDept() + { + return dept; + } + + public void setDept(String dept) + { + this.dept = dept; + } + + public int getEid() + { + return eid; + } + + public void setEid(int eid) + { + this.eid = eid; + } + + public Date getDateOfJoining() + { + return dateOfJoining; + } + + public void setDateOfJoining(Date dateOfJoining) + { + this.dateOfJoining = dateOfJoining; + } + + public Address getAddress() + { + return address; + } + + public void setAddress(Address address) + { + this.address = address; + } + } + + public static class Address + { + + private String city; + private String country; + + public String getCity() + { + return city; + } + + public void setCity(String city) + { + this.city = city; + } + + public String getCountry() + { + return country; + } + + public void setCountry(String country) + { + this.country = country; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java new file mode 100644 index 0000000..3c31ad0 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java @@ -0,0 +1,172 @@ +package com.datatorrent.contrib.schema.parser; + +import java.util.Date; + +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.contrib.schema.parser.CsvParser; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class CsvParserTest +{ + + CsvParser operator; + CollectorTestSink<Object> validDataSink; + CollectorTestSink<String> invalidDataSink; + + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + + @Override + protected void starting(Description description) + { + super.starting(description); + operator = new CsvParser(); + operator.setClazz(EmployeeBean.class); + operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); + validDataSink = new CollectorTestSink<Object>(); + invalidDataSink = new CollectorTestSink<String>(); + TestUtils.setSink(operator.out, validDataSink); + TestUtils.setSink(operator.err, invalidDataSink); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + operator.teardown(); + } + + } + + @Test + public void testCsvToPojoWriterDefault() + { + operator.setup(null); + String tuple = "john,cs,1,01/01/2015"; + operator.in.process(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(EmployeeBean.class, obj.getClass()); + EmployeeBean pojo = (EmployeeBean)obj; + Assert.assertEquals("john", pojo.getName()); + Assert.assertEquals("cs", pojo.getDept()); + Assert.assertEquals(1, pojo.getEid()); + Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( + pojo.getDateOfJoining())); + } + + @Test + public void testCsvToPojoWriterDateFormat() + { + operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy"); + operator.setup(null); + String tuple = "john,cs,1,01-JAN-2015"; + operator.in.process(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(EmployeeBean.class, obj.getClass()); + EmployeeBean pojo = (EmployeeBean)obj; + Assert.assertEquals("john", pojo.getName()); + Assert.assertEquals("cs", pojo.getDept()); + Assert.assertEquals(1, pojo.getEid()); + Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( + pojo.getDateOfJoining())); + } + + @Test + public void testCsvToPojoWriterDateFormatMultiple() + { + operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date"); + operator.setup(null); + String tuple = "john,cs,1,01-JAN-2015,01/01/2015"; + operator.in.process(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(EmployeeBean.class, obj.getClass()); + EmployeeBean pojo = (EmployeeBean)obj; + Assert.assertEquals("john", pojo.getName()); + Assert.assertEquals("cs", pojo.getDept()); + Assert.assertEquals(1, pojo.getEid()); + Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( + pojo.getDateOfJoining())); + Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( + pojo.getDateOfBirth())); + } + + public static class EmployeeBean + { + + private String name; + private String dept; + private int eid; + private Date dateOfJoining; + private Date dateOfBirth; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public String getDept() + { + return dept; + } + + public void setDept(String dept) + { + this.dept = dept; + } + + public int getEid() + { + return eid; + } + + public void setEid(int eid) + { + this.eid = eid; + } + + public Date getDateOfJoining() + { + return dateOfJoining; + } + + public void setDateOfJoining(Date dateOfJoining) + { + this.dateOfJoining = dateOfJoining; + } + + public Date getDateOfBirth() + { + return dateOfBirth; + } + + public void setDateOfBirth(Date dateOfBirth) + { + this.dateOfBirth = dateOfBirth; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java new file mode 100644 index 0000000..b453508 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java @@ -0,0 +1,212 @@ +package com.datatorrent.contrib.schema.parser; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Date; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.Description; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.lib.util.TestUtils.TestInfo; + +public class JsonParserTest +{ + JsonParser operator; + CollectorTestSink<Object> validDataSink; + CollectorTestSink<String> invalidDataSink; + + final ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + + public JsonParserTest() + { + // So that the output is cleaner. + System.setErr(new PrintStream(myOut)); + } + + @Rule + public TestInfo testMeta = new FSTestWatcher() + { + private void deleteDirectory() + { + try { + FileUtils.deleteDirectory(new File(getDir())); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + protected void starting(Description descriptor) + { + + super.starting(descriptor); + deleteDirectory(); + + operator = new JsonParser(); + operator.setClazz(Test1Pojo.class); + validDataSink = new CollectorTestSink<Object>(); + invalidDataSink = new CollectorTestSink<String>(); + TestUtils.setSink(operator.out, validDataSink); + TestUtils.setSink(operator.err, invalidDataSink); + operator.setup(null); + operator.activate(null); + + operator.beginWindow(0); + } + + @Override + protected void finished(Description description) + { + operator.endWindow(); + operator.teardown(); + + deleteDirectory(); + super.finished(description); + } + }; + + @Test + public void testJSONToPOJO() + { + String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; + operator.in.put(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Test1Pojo.class, obj.getClass()); + Test1Pojo pojo = (Test1Pojo)obj; + Assert.assertEquals(123, pojo.a); + Assert.assertEquals(234876274, pojo.b); + Assert.assertEquals("HowAreYou?", pojo.c); + Assert.assertEquals(3, pojo.d.size()); + Assert.assertEquals("ABC", pojo.d.get(0)); + Assert.assertEquals("PQR", pojo.d.get(1)); + Assert.assertEquals("XYZ", pojo.d.get(2)); + } + + @Test + public void testJSONToPOJODate() + { + String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; + operator.setDateFormat("dd-MM-yyyy"); + operator.setup(null); + operator.activate(null); + operator.in.put(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Test1Pojo.class, obj.getClass()); + Test1Pojo pojo = (Test1Pojo)obj; + Assert.assertEquals(123, pojo.a); + Assert.assertEquals(234876274, pojo.b); + Assert.assertEquals("HowAreYou?", pojo.c); + Assert.assertEquals(3, pojo.d.size()); + Assert.assertEquals("ABC", pojo.d.get(0)); + Assert.assertEquals("PQR", pojo.d.get(1)); + Assert.assertEquals("XYZ", pojo.d.get(2)); + Assert.assertEquals(2015, new DateTime(pojo.date).getYear()); + Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear()); + Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth()); + } + + @Test + public void testJSONToPOJOInvalidData() + { + String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}"; + operator.in.put(tuple); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJOUnknownFields() + { + String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}"; + operator.in.put(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Test1Pojo.class, obj.getClass()); + Test1Pojo pojo = (Test1Pojo)obj; + Assert.assertEquals(123, pojo.a); + Assert.assertEquals(234876274, pojo.b); + Assert.assertEquals("HowAreYou?", pojo.c); + Assert.assertEquals(null, pojo.d); + } + + @Test + public void testJSONToPOJOMismatchingFields() + { + String tuple = "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; + operator.in.put(tuple); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJOEmptyString() + { + String tuple = ""; + operator.in.put(tuple); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJOEmptyJSON() + { + String tuple = "{}"; + operator.in.put(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Test1Pojo.class, obj.getClass()); + Test1Pojo pojo = (Test1Pojo)obj; + Assert.assertEquals(0, pojo.a); + Assert.assertEquals(0, pojo.b); + Assert.assertEquals(null, pojo.c); + Assert.assertEquals(null, pojo.d); + } + + @Test + public void testJSONToPOJOArrayInJson() + { + String tuple = "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; + operator.in.put(tuple); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); + } + + public static class Test1Pojo + { + public int a; + public long b; + public String c; + public List<String> d; + public Date date; + + @Override + public String toString() + { + return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java new file mode 100644 index 0000000..4298951 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java @@ -0,0 +1,254 @@ +package com.datatorrent.contrib.schema.parser; + +import java.util.Date; + +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class XmlParserTest +{ + XmlParser operator; + CollectorTestSink<Object> validDataSink; + CollectorTestSink<String> invalidDataSink; + + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + + @Override + protected void starting(Description description) + { + super.starting(description); + operator = new XmlParser(); + operator.setClazz(EmployeeBean.class); + operator.setDateFormats("yyyy-MM-dd"); //setting default date pattern + validDataSink = new CollectorTestSink<Object>(); + invalidDataSink = new CollectorTestSink<String>(); + TestUtils.setSink(operator.out, validDataSink); + TestUtils.setSink(operator.err, invalidDataSink); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + operator.teardown(); + } + + } + + @Test + public void testXmlToPojoWithoutAlias() + { + String tuple = "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + "<name>john</name>" + + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>"; + + operator.setup(null); + operator.activate(null); + operator.in.process(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(EmployeeBean.class, obj.getClass()); + EmployeeBean pojo = (EmployeeBean)obj; + Assert.assertEquals("john", pojo.getName()); + Assert.assertEquals("cs", pojo.getDept()); + Assert.assertEquals(1, pojo.getEid()); + Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear()); + Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear()); + Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth()); + } + + @Test + public void testXmlToPojoWithAliasDateFormat() + { + String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>" + + "<dateOfJoining>2015-JAN-01</dateOfJoining>" + "</EmployeeBean>"; + + operator.setAlias("EmployeeBean"); + operator.setDateFormats("yyyy-MM-dd,yyyy-MMM-dd"); + operator.setup(null); + operator.activate(null); + operator.in.process(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(EmployeeBean.class, obj.getClass()); + EmployeeBean pojo = (EmployeeBean)obj; + Assert.assertEquals("john", pojo.getName()); + Assert.assertEquals("cs", pojo.getDept()); + Assert.assertEquals(1, pojo.getEid()); + Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear()); + Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear()); + Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth()); + } + + @Test + public void testXmlToPojoWithAlias() + { + String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>" + + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>"; + + operator.setAlias("EmployeeBean"); + operator.setup(null); + operator.activate(null); + operator.in.process(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(EmployeeBean.class, obj.getClass()); + EmployeeBean pojo = (EmployeeBean)obj; + Assert.assertEquals("john", pojo.getName()); + Assert.assertEquals("cs", pojo.getDept()); + Assert.assertEquals(1, pojo.getEid()); + Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear()); + Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear()); + Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth()); + } + + @Test + public void testXmlToPojoIncorrectXML() + { + String tuple = "<EmployeeBean>" + + "<firstname>john</firstname>" //incorrect field name + + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01 00:00:00.00 IST</dateOfJoining>" + + "</EmployeeBean>"; + + operator.setAlias("EmployeeBean"); + operator.setup(null); + operator.activate(null); + operator.in.process(tuple); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); + } + + @Test + public void testXmlToPojoWithoutAliasHeirarchical() + { + String tuple = "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + "<name>john</name>" + + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>" + + "<city>new york</city>" + "<country>US</country>" + "</address>" + + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>"; + + operator.setup(null); + operator.activate(null); + operator.in.process(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(EmployeeBean.class, obj.getClass()); + EmployeeBean pojo = (EmployeeBean)obj; + Assert.assertEquals("john", pojo.getName()); + Assert.assertEquals("cs", pojo.getDept()); + Assert.assertEquals(1, pojo.getEid()); + Assert.assertEquals(Address.class, pojo.getAddress().getClass()); + Assert.assertEquals("new york", pojo.getAddress().getCity()); + Assert.assertEquals("US", pojo.getAddress().getCountry()); + Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear()); + Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear()); + Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth()); + } + + public static class EmployeeBean + { + + private String name; + private String dept; + private int eid; + private Date dateOfJoining; + private Address address; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public String getDept() + { + return dept; + } + + public void setDept(String dept) + { + this.dept = dept; + } + + public int getEid() + { + return eid; + } + + public void setEid(int eid) + { + this.eid = eid; + } + + public Date getDateOfJoining() + { + return dateOfJoining; + } + + public void setDateOfJoining(Date dateOfJoining) + { + this.dateOfJoining = dateOfJoining; + } + + public Address getAddress() + { + return address; + } + + public void setAddress(Address address) + { + this.address = address; + } + } + + public static class Address + { + + private String city; + private String country; + + public String getCity() + { + return city; + } + + public void setCity(String city) + { + this.city = city; + } + + public String getCountry() + { + return country; + } + + public void setCountry(String country) + { + this.country = country; + } + } + +}
