This does not pass apache-rat:check https://api.travis-ci.org/jobs/85458770/log.txt?deansi=true
******************************* Unapproved licenses: src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java On Wed, Oct 14, 2015 at 4:48 AM, <[email protected]> wrote: > Repository: incubator-apex-malhar > Updated Branches: > refs/heads/devel-3 e1a45507b -> 3f4fe1866 > > > MLHR-1838 Added pojo parsers and formatters(csv,json,xml) > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo > Commit: > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/3f4fe186 > Tree: > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/3f4fe186 > Diff: > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/3f4fe186 > > Branch: refs/heads/devel-3 > Commit: 3f4fe18665c59dadb8ad289f696df983bdc451ce > Parents: e1a4550 > Author: shubham <[email protected]> > Authored: Fri Sep 11 16:26:03 2015 +0530 > Committer: shubham <[email protected]> > Committed: Wed Oct 14 10:53:12 2015 +0530 > > ---------------------------------------------------------------------- > contrib/pom.xml | 12 + > .../contrib/converter/Converter.java | 43 +++ > .../contrib/schema/formatter/CsvFormatter.java | 285 +++++++++++++++++ > .../contrib/schema/formatter/Formatter.java | 101 ++++++ > .../contrib/schema/formatter/JsonFormatter.java | 109 +++++++ > .../contrib/schema/formatter/XmlFormatter.java | 172 ++++++++++ > .../contrib/schema/parser/CsvParser.java | 314 +++++++++++++++++++ > .../contrib/schema/parser/JsonParser.java | 106 +++++++ > .../contrib/schema/parser/Parser.java | 102 ++++++ > .../contrib/schema/parser/XmlParser.java | 141 +++++++++ > .../schema/formatter/CsvFormatterTest.java | 147 +++++++++ > .../schema/formatter/JsonFormatterTest.java | 186 +++++++++++ > .../schema/formatter/XmlFormatterTest.java | 226 +++++++++++++ > .../contrib/schema/parser/CsvParserTest.java | 172 ++++++++++ > .../contrib/schema/parser/JsonParserTest.java | 212 +++++++++++++ > .../contrib/schema/parser/XmlParserTest.java | 254 +++++++++++++++ > 16 files changed, 2582 insertions(+) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/pom.xml > ---------------------------------------------------------------------- > diff --git a/contrib/pom.xml b/contrib/pom.xml > index abed040..91ef5c7 100755 > --- a/contrib/pom.xml > +++ b/contrib/pom.xml > @@ -606,5 +606,17 @@ > <version>${dt.framework.version}</version> > <type>jar</type> > </dependency> > + <dependency> > + <!-- required by Xml parser and formatter --> > + <groupId>com.thoughtworks.xstream</groupId> > + <artifactId>xstream</artifactId> > + <version>1.4.8</version> > + </dependency> > + <dependency> > + <!-- required by Csv parser and formatter --> > + <groupId>net.sf.supercsv</groupId> > + <artifactId>super-csv-joda</artifactId> > + <version>2.3.1</version> > + </dependency> > </dependencies> > </project> > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java > b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java > new file mode 100644 > index 0000000..ebf2925 > --- /dev/null > +++ > b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java > @@ -0,0 +1,43 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.converter; > + > +import org.apache.hadoop.classification.InterfaceStability; > + > +/** > + * Operators that are converting tuples from one format to another must > + * implement this interface. Eg. Parsers or formatters , that parse data > of > + * certain format and convert them to another format. > + * > + * @param <INPUT> > + * @param <OUTPUT> > + */ > [email protected] > +public interface Converter<INPUT, OUTPUT> > +{ > + /** > + * Provide the implementation for converting tuples from one format to > the > + * other > + * > + * @param INPUT > + * tuple of certain format > + * @return OUTPUT tuple of converted format > + */ > + public OUTPUT convert(INPUT tuple); > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java > b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java > new file mode 100644 > index 0000000..924acc6 > --- /dev/null > +++ > b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java > @@ -0,0 +1,285 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.schema.formatter; > + > +import java.io.IOException; > +import java.io.StringWriter; > +import java.util.ArrayList; > + > +import javax.validation.constraints.NotNull; > + > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > +import org.supercsv.cellprocessor.FmtDate; > +import org.supercsv.cellprocessor.Optional; > +import org.supercsv.cellprocessor.ift.CellProcessor; > +import org.supercsv.exception.SuperCsvException; > +import org.supercsv.io.CsvBeanWriter; > +import org.supercsv.io.ICsvBeanWriter; > +import org.supercsv.prefs.CsvPreference; > + > +import org.apache.hadoop.classification.InterfaceStability; > + > +import com.datatorrent.api.Context; > +import com.datatorrent.netlet.util.DTThrowable; > + > +/** > + * Operator that converts POJO to CSV string <br> > + * Assumption is that each field in the delimited data should map to a > simple > + * java type.<br> > + * <br> > + * <b>Properties</b> <br> > + * <b>fieldInfo</b>:User need to specify fields and their types as a comma > + * separated string having format > <NAME>:<TYPE>|<FORMAT> in > + * the same order as incoming data. FORMAT refers to dates with > dd/mm/yyyy as > + * default e.g > name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy > + * > + * @displayName CsvFormatter > + * @category Formatter > + * @tags pojo csv formatter > + */ > [email protected] > +public class CsvFormatter extends Formatter<String> > +{ > + > + private ArrayList<Field> fields; > + @NotNull > + protected String classname; > + @NotNull > + protected int fieldDelimiter; > + protected String lineDelimiter; > + > + @NotNull > + protected String fieldInfo; > + > + public enum FIELD_TYPE > + { > + BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE > + }; > + > + protected transient String[] nameMapping; > + protected transient CellProcessor[] processors; > + protected transient CsvPreference preference; > + > + public CsvFormatter() > + { > + fields = new ArrayList<Field>(); > + fieldDelimiter = ','; > + lineDelimiter = "\r\n"; > + > + } > + > + @Override > + public void setup(Context.OperatorContext context) > + { > + super.setup(context); > + > + //fieldInfo information > + fields = new ArrayList<Field>(); > + String[] fieldInfoTuple = fieldInfo.split(","); > + for (int i = 0; i < fieldInfoTuple.length; i++) { > + String[] fieldTuple = fieldInfoTuple[i].split(":"); > + Field field = new Field(); > + field.setName(fieldTuple[0]); > + String[] typeFormat = fieldTuple[1].split("\\|"); > + field.setType(typeFormat[0].toUpperCase()); > + if (typeFormat.length > 1) { > + field.setFormat(typeFormat[1]); > + } > + getFields().add(field); > + } > + preference = new CsvPreference.Builder('"', fieldDelimiter, > lineDelimiter).build(); > + int countKeyValue = getFields().size(); > + nameMapping = new String[countKeyValue]; > + processors = new CellProcessor[countKeyValue]; > + initialise(nameMapping, processors); > + > + } > + > + private void initialise(String[] nameMapping, CellProcessor[] > processors) > + { > + for (int i = 0; i < getFields().size(); i++) { > + FIELD_TYPE type = getFields().get(i).type; > + nameMapping[i] = getFields().get(i).name; > + if (type == FIELD_TYPE.DATE) { > + String dateFormat = getFields().get(i).format; > + processors[i] = new Optional(new FmtDate(dateFormat == null ? > "dd/MM/yyyy" : dateFormat)); > + } else { > + processors[i] = new Optional(); > + } > + } > + > + } > + > + @Override > + public void activate(Context context) > + { > + > + } > + > + @Override > + public void deactivate() > + { > + > + } > + > + @Override > + public String convert(Object tuple) > + { > + try { > + StringWriter stringWriter = new StringWriter(); > + ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, > preference); > + beanWriter.write(tuple, nameMapping, processors); > + beanWriter.flush(); > + beanWriter.close(); > + return stringWriter.toString(); > + } catch (SuperCsvException e) { > + logger.debug("Error while converting tuple {} > {}",tuple,e.getMessage()); > + } catch (IOException e) { > + DTThrowable.rethrow(e); > + } > + return null; > + } > + > + public static class Field > + { > + String name; > + String format; > + FIELD_TYPE type; > + > + public String getName() > + { > + return name; > + } > + > + public void setName(String name) > + { > + this.name = name; > + } > + > + public FIELD_TYPE getType() > + { > + return type; > + } > + > + public void setType(String type) > + { > + this.type = FIELD_TYPE.valueOf(type); > + } > + > + public String getFormat() > + { > + return format; > + } > + > + public void setFormat(String format) > + { > + this.format = format; > + } > + } > + > + /** > + * Gets the array list of the fields, a field being a POJO containing > the name > + * of the field and type of field. > + * > + * @return An array list of Fields. > + */ > + public ArrayList<Field> getFields() > + { > + return fields; > + } > + > + /** > + * Sets the array list of the fields, a field being a POJO containing > the name > + * of the field and type of field. > + * > + * @param fields > + * An array list of Fields. > + */ > + public void setFields(ArrayList<Field> fields) > + { > + this.fields = fields; > + } > + > + /** > + * Gets the delimiter which separates fields in incoming data. > + * > + * @return fieldDelimiter > + */ > + public int getFieldDelimiter() > + { > + return fieldDelimiter; > + } > + > + /** > + * Sets the delimiter which separates fields in incoming data. > + * > + * @param fieldDelimiter > + */ > + public void setFieldDelimiter(int fieldDelimiter) > + { > + this.fieldDelimiter = fieldDelimiter; > + } > + > + /** > + * Gets the delimiter which separates lines in incoming data. > + * > + * @return lineDelimiter > + */ > + public String getLineDelimiter() > + { > + return lineDelimiter; > + } > + > + /** > + * Sets the delimiter which separates line in incoming data. > + * > + * @param lineDelimiter > + */ > + public void setLineDelimiter(String lineDelimiter) > + { > + this.lineDelimiter = lineDelimiter; > + } > + > + /** > + * Gets the name of the fields with type and format in data as comma > separated > + * string in same order as incoming data. e.g > + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy > + * > + * @return fieldInfo > + */ > + public String getFieldInfo() > + { > + return fieldInfo; > + } > + > + /** > + * Sets the name of the fields with type and format in data as comma > separated > + * string in same order as incoming data. e.g > + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy > + * > + * @param fieldInfo > + */ > + public void setFieldInfo(String fieldInfo) > + { > + this.fieldInfo = fieldInfo; > + } > + > + private static final Logger logger = > LoggerFactory.getLogger(CsvFormatter.class); > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java > b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java > new file mode 100644 > index 0000000..19a78e0 > --- /dev/null > +++ > b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java > @@ -0,0 +1,101 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.schema.formatter; > + > +import org.apache.hadoop.classification.InterfaceStability; > + > +import com.datatorrent.api.Context; > +import com.datatorrent.api.Context.PortContext; > +import com.datatorrent.api.DefaultInputPort; > +import com.datatorrent.api.DefaultOutputPort; > +import com.datatorrent.api.Operator.ActivationListener; > +import com.datatorrent.api.annotation.InputPortFieldAnnotation; > +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; > +import com.datatorrent.common.util.BaseOperator; > +import com.datatorrent.contrib.converter.Converter; > + > +/** > + * Abstract class that implements Converter interface. This is a schema > enabled > + * Formatter <br> > + * Sub classes need to implement the convert method <br> > + * <b>Port Interface</b><br> > + * <b>in</b>: expects <Object> this is a schema enabled port<br> > + * <b>out</b>: emits <OUTPUT> <br> > + * <b>err</b>: emits <Object> error port that emits input tuple > that could > + * not be converted<br> > + * <br> > + * > + * @displayName Parser > + * @tags parser converter > + * @param <INPUT> > + */ > [email protected] > +public abstract class Formatter<OUTPUT> extends BaseOperator implements > Converter<Object, OUTPUT>, > + ActivationListener<Context> > +{ > + protected transient Class<?> clazz; > + > + @OutputPortFieldAnnotation > + public transient DefaultOutputPort<OUTPUT> out = new > DefaultOutputPort<OUTPUT>(); > + > + @OutputPortFieldAnnotation(optional = true) > + public transient DefaultOutputPort<Object> err = new > DefaultOutputPort<Object>(); > + > + @InputPortFieldAnnotation(schemaRequired = true) > + public transient DefaultInputPort<Object> in = new > DefaultInputPort<Object>() > + { > + public void setup(PortContext context) > + { > + clazz = context.getValue(Context.PortContext.TUPLE_CLASS); > + } > + > + @Override > + public void process(Object inputTuple) > + { > + OUTPUT tuple = convert(inputTuple); > + if (tuple == null && err.isConnected()) { > + err.emit(inputTuple); > + return; > + } > + if (out.isConnected()) { > + out.emit(tuple); > + } > + } > + }; > + > + /** > + * Get the class that needs to be formatted > + * > + * @return Class<?> > + */ > + public Class<?> getClazz() > + { > + return clazz; > + } > + > + /** > + * Set the class of tuple that needs to be formatted > + * > + * @param clazz > + */ > + public void setClazz(Class<?> clazz) > + { > + this.clazz = clazz; > + } > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java > b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java > new file mode 100644 > index 0000000..344ac60 > --- /dev/null > +++ > b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java > @@ -0,0 +1,109 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.schema.formatter; > + > +import java.io.IOException; > +import java.text.SimpleDateFormat; > + > +import org.codehaus.jackson.JsonGenerationException; > +import org.codehaus.jackson.map.JsonMappingException; > +import org.codehaus.jackson.map.ObjectMapper; > +import org.codehaus.jackson.map.ObjectWriter; > +import org.codehaus.jackson.map.SerializationConfig; > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > + > +import org.apache.hadoop.classification.InterfaceStability; > + > +import com.datatorrent.api.Context; > +import com.datatorrent.netlet.util.DTThrowable; > + > +/** > + * Operator that converts POJO to JSON string <br> > + * <b>Properties</b> <br> > + * <b>dateFormat</b>: date format e.g dd/MM/yyyy > + * > + * @displayName JsonFormatter > + * @category Formatter > + * @tags pojo json formatter > + */ > [email protected] > +public class JsonFormatter extends Formatter<String> > +{ > + private transient ObjectWriter writer; > + protected String dateFormat; > + > + @Override > + public void activate(Context context) > + { > + try { > + ObjectMapper mapper = new ObjectMapper(); > + if (dateFormat != null) { > + mapper.setDateFormat(new SimpleDateFormat(dateFormat)); > + } > + writer = mapper.writerWithType(clazz); > + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, > true); > + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, > true); > + > mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, true); > + } catch (Throwable e) { > + throw new RuntimeException("Unable find provided class"); > + } > + } > + > + @Override > + public void deactivate() > + { > + > + } > + > + @Override > + public String convert(Object tuple) > + { > + try { > + return writer.writeValueAsString(tuple); > + } catch (JsonGenerationException | JsonMappingException e) { > + logger.debug("Error while converting tuple {} > {}",tuple,e.getMessage()); > + } catch (IOException e) { > + DTThrowable.rethrow(e); > + } > + return null; > + } > + > + /** > + * Get the date format > + * > + * @return Date format string > + */ > + public String getDateFormat() > + { > + return dateFormat; > + } > + > + /** > + * Set the date format > + * > + * @param dateFormat > + */ > + public void setDateFormat(String dateFormat) > + { > + this.dateFormat = dateFormat; > + } > + > + private static final Logger logger = > LoggerFactory.getLogger(JsonFormatter.class); > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java > b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java > new file mode 100644 > index 0000000..b387031 > --- /dev/null > +++ > b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java > @@ -0,0 +1,172 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.schema.formatter; > + > +import java.io.Writer; > + > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > + > +import org.apache.hadoop.classification.InterfaceStability; > + > +import com.datatorrent.api.Context; > + > +import com.thoughtworks.xstream.XStream; > +import com.thoughtworks.xstream.XStreamException; > +import com.thoughtworks.xstream.converters.basic.DateConverter; > +import com.thoughtworks.xstream.io.HierarchicalStreamWriter; > +import com.thoughtworks.xstream.io.xml.CompactWriter; > +import com.thoughtworks.xstream.io.xml.XppDriver; > + > +/** > + * @displayName XmlParser > + * @category Formatter > + * @tags xml pojo formatter > + */ > [email protected] > +public class XmlFormatter extends Formatter<String> > +{ > + > + private transient XStream xstream; > + > + protected String alias; > + protected String dateFormat; > + protected boolean prettyPrint; > + > + public XmlFormatter() > + { > + alias = null; > + dateFormat = null; > + } > + > + @Override > + public void activate(Context context) > + { > + if (prettyPrint) { > + xstream = new XStream(); > + } else { > + xstream = new XStream(new XppDriver() > + { > + @Override > + public HierarchicalStreamWriter createWriter(Writer out) > + { > + return new CompactWriter(out, getNameCoder()); > + } > + }); > + } > + if (alias != null) { > + try { > + xstream.alias(alias, clazz); > + } catch (Throwable e) { > + throw new RuntimeException("Unable find provided class"); > + } > + } > + if (dateFormat != null) { > + xstream.registerConverter(new DateConverter(dateFormat, new > String[] {})); > + } > + } > + > + @Override > + public void deactivate() > + { > + > + } > + > + @Override > + public String convert(Object tuple) > + { > + try { > + return xstream.toXML(tuple); > + } catch (XStreamException e) { > + logger.debug("Error while converting tuple {} {} > ",tuple,e.getMessage()); > + return null; > + } > + } > + > + /** > + * Gets the alias This is an optional step. Without it XStream would > work > + * fine, but the XML element names would contain the fully qualified > name of > + * each class (including package) which would bulk up the XML a bit. > + * > + * @return alias. > + */ > + public String getAlias() > + { > + return alias; > + } > + > + /** > + * Sets the alias This is an optional step. Without it XStream would > work > + * fine, but the XML element names would contain the fully qualified > name of > + * each class (including package) which would bulk up the XML a bit. > + * > + * @param alias > + * . > + */ > + public void setAlias(String alias) > + { > + this.alias = alias; > + } > + > + /** > + * Gets the date format e.g dd/mm/yyyy - this will be how a date would > be > + * formatted > + * > + * @return dateFormat. > + */ > + public String getDateFormat() > + { > + return dateFormat; > + } > + > + /** > + * Sets the date format e.g dd/mm/yyyy - this will be how a date would > be > + * formatted > + * > + * @param dateFormat > + * . > + */ > + public void setDateFormat(String dateFormat) > + { > + this.dateFormat = dateFormat; > + } > + > + /** > + * Returns true if pretty print is enabled. > + * > + * @return prettyPrint > + */ > + public boolean isPrettyPrint() > + { > + return prettyPrint; > + } > + > + /** > + * Sets pretty print option. > + * > + * @param prettyPrint > + */ > + public void setPrettyPrint(boolean prettyPrint) > + { > + this.prettyPrint = prettyPrint; > + } > + > + private static final Logger logger = > LoggerFactory.getLogger(XmlFormatter.class); > + > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java > b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java > new file mode 100644 > index 0000000..4fd39fb > --- /dev/null > +++ > b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java > @@ -0,0 +1,314 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.schema.parser; > + > +import java.io.IOException; > +import java.util.ArrayList; > + > +import javax.validation.constraints.NotNull; > + > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > +import org.supercsv.cellprocessor.Optional; > +import org.supercsv.cellprocessor.ParseBool; > +import org.supercsv.cellprocessor.ParseChar; > +import org.supercsv.cellprocessor.ParseDate; > +import org.supercsv.cellprocessor.ParseDouble; > +import org.supercsv.cellprocessor.ParseInt; > +import org.supercsv.cellprocessor.ParseLong; > +import org.supercsv.cellprocessor.ift.CellProcessor; > +import org.supercsv.io.CsvBeanReader; > +import org.supercsv.prefs.CsvPreference; > + > +import org.apache.hadoop.classification.InterfaceStability; > + > +import com.datatorrent.api.Context; > +import com.datatorrent.api.Context.OperatorContext; > +import com.datatorrent.lib.util.ReusableStringReader; > +import com.datatorrent.netlet.util.DTThrowable; > + > +/** > + * Operator that converts CSV string to Pojo <br> > + * Assumption is that each field in the delimited data should map to a > simple > + * java type.<br> > + * <br> > + * <b>Properties</b> <br> > + * <b>fieldInfo</b>:User need to specify fields and their types as a comma > + * separated string having format > <NAME>:<TYPE>|<FORMAT> in > + * the same order as incoming data. FORMAT refers to dates with > dd/mm/yyyy as > + * default e.g > name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br> > + * <b>fieldDelimiter</b>: Default is comma <br> > + * <b>lineDelimiter</b>: Default is '\r\n' > + * > + * @displayName CsvParser > + * @category Parsers > + * @tags csv pojo parser > + */ > [email protected] > +public class CsvParser extends Parser<String> > +{ > + > + private ArrayList<Field> fields; > + @NotNull > + protected int fieldDelimiter; > + protected String lineDelimiter; > + > + @NotNull > + protected String fieldInfo; > + > + protected transient String[] nameMapping; > + protected transient CellProcessor[] processors; > + private transient CsvBeanReader csvReader; > + > + public enum FIELD_TYPE > + { > + BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE > + }; > + > + @NotNull > + private transient ReusableStringReader csvStringReader = new > ReusableStringReader(); > + > + public CsvParser() > + { > + fields = new ArrayList<Field>(); > + fieldDelimiter = ','; > + lineDelimiter = "\r\n"; > + } > + > + @Override > + public void setup(OperatorContext context) > + { > + super.setup(context); > + > + logger.info("field info {}", fieldInfo); > + fields = new ArrayList<Field>(); > + String[] fieldInfoTuple = fieldInfo.split(","); > + for (int i = 0; i < fieldInfoTuple.length; i++) { > + String[] fieldTuple = fieldInfoTuple[i].split(":"); > + Field field = new Field(); > + field.setName(fieldTuple[0]); > + String[] typeFormat = fieldTuple[1].split("\\|"); > + field.setType(typeFormat[0].toUpperCase()); > + if (typeFormat.length > 1) { > + field.setFormat(typeFormat[1]); > + } > + getFields().add(field); > + } > + > + CsvPreference preference = new CsvPreference.Builder('"', > fieldDelimiter, lineDelimiter).build(); > + csvReader = new CsvBeanReader(csvStringReader, preference); > + int countKeyValue = getFields().size(); > + logger.info("countKeyValue {}", countKeyValue); > + nameMapping = new String[countKeyValue]; > + processors = new CellProcessor[countKeyValue]; > + initialise(nameMapping, processors); > + } > + > + private void initialise(String[] nameMapping, CellProcessor[] > processors) > + { > + for (int i = 0; i < getFields().size(); i++) { > + FIELD_TYPE type = getFields().get(i).type; > + nameMapping[i] = getFields().get(i).name; > + if (type == FIELD_TYPE.DOUBLE) { > + processors[i] = new Optional(new ParseDouble()); > + } else if (type == FIELD_TYPE.INTEGER) { > + processors[i] = new Optional(new ParseInt()); > + } else if (type == FIELD_TYPE.FLOAT) { > + processors[i] = new Optional(new ParseDouble()); > + } else if (type == FIELD_TYPE.LONG) { > + processors[i] = new Optional(new ParseLong()); > + } else if (type == FIELD_TYPE.SHORT) { > + processors[i] = new Optional(new ParseInt()); > + } else if (type == FIELD_TYPE.STRING) { > + processors[i] = new Optional(); > + } else if (type == FIELD_TYPE.CHARACTER) { > + processors[i] = new Optional(new ParseChar()); > + } else if (type == FIELD_TYPE.BOOLEAN) { > + processors[i] = new Optional(new ParseBool()); > + } else if (type == FIELD_TYPE.DATE) { > + String dateFormat = getFields().get(i).format; > + processors[i] = new Optional(new ParseDate(dateFormat == null ? > "dd/MM/yyyy" : dateFormat)); > + } > + } > + } > + > + @Override > + public void activate(Context context) > + { > + > + } > + > + @Override > + public void deactivate() > + { > + > + } > + > + @Override > + public Object convert(String tuple) > + { > + try { > + csvStringReader.open(tuple); > + return csvReader.read(clazz, nameMapping, processors); > + } catch (IOException e) { > + logger.debug("Error while converting tuple {} > {}",tuple,e.getMessage()); > + return null; > + } > + } > + > + @Override > + public void teardown() > + { > + try { > + if (csvReader != null) { > + csvReader.close(); > + } > + } catch (IOException e) { > + DTThrowable.rethrow(e); > + } > + } > + > + public static class Field > + { > + String name; > + String format; > + FIELD_TYPE type; > + > + public String getName() > + { > + return name; > + } > + > + public void setName(String name) > + { > + this.name = name; > + } > + > + public FIELD_TYPE getType() > + { > + return type; > + } > + > + public void setType(String type) > + { > + this.type = FIELD_TYPE.valueOf(type); > + } > + > + public String getFormat() > + { > + return format; > + } > + > + public void setFormat(String format) > + { > + this.format = format; > + } > + > + } > + > + /** > + * Gets the array list of the fields, a field being a POJO containing > the name > + * of the field and type of field. > + * > + * @return An array list of Fields. > + */ > + public ArrayList<Field> getFields() > + { > + return fields; > + } > + > + /** > + * Sets the array list of the fields, a field being a POJO containing > the name > + * of the field and type of field. > + * > + * @param fields > + * An array list of Fields. > + */ > + public void setFields(ArrayList<Field> fields) > + { > + this.fields = fields; > + } > + > + /** > + * Gets the delimiter which separates fields in incoming data. > + * > + * @return fieldDelimiter > + */ > + public int getFieldDelimiter() > + { > + return fieldDelimiter; > + } > + > + /** > + * Sets the delimiter which separates fields in incoming data. > + * > + * @param fieldDelimiter > + */ > + public void setFieldDelimiter(int fieldDelimiter) > + { > + this.fieldDelimiter = fieldDelimiter; > + } > + > + /** > + * Gets the delimiter which separates lines in incoming data. > + * > + * @return lineDelimiter > + */ > + public String getLineDelimiter() > + { > + return lineDelimiter; > + } > + > + /** > + * Sets the delimiter which separates line in incoming data. > + * > + * @param lineDelimiter > + */ > + public void setLineDelimiter(String lineDelimiter) > + { > + this.lineDelimiter = lineDelimiter; > + } > + > + /** > + * Gets the name of the fields with type and format ( for date ) as > comma > + * separated string in same order as incoming data. e.g > + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy > + * > + * @return fieldInfo > + */ > + public String getFieldInfo() > + { > + return fieldInfo; > + } > + > + /** > + * Sets the name of the fields with type and format ( for date ) as > comma > + * separated string in same order as incoming data. e.g > + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy > + * > + * @param fieldInfo > + */ > + public void setFieldInfo(String fieldInfo) > + { > + this.fieldInfo = fieldInfo; > + } > + > + private static final Logger logger = > LoggerFactory.getLogger(CsvParser.class); > + > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java > b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java > new file mode 100644 > index 0000000..db45b33 > --- /dev/null > +++ > b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java > @@ -0,0 +1,106 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.schema.parser; > + > +import java.io.IOException; > +import java.text.SimpleDateFormat; > + > +import org.codehaus.jackson.JsonProcessingException; > +import org.codehaus.jackson.map.DeserializationConfig; > +import org.codehaus.jackson.map.ObjectMapper; > +import org.codehaus.jackson.map.ObjectReader; > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > + > +import org.apache.hadoop.classification.InterfaceStability; > + > +import com.datatorrent.api.Context; > +import com.datatorrent.netlet.util.DTThrowable; > + > +/** > + * Operator that converts JSON string to Pojo <br> > + * <b>Properties</b> <br> > + * <b>dateFormat</b>: date format e.g dd/MM/yyyy > + * > + * @displayName JsonParser > + * @category Parsers > + * @tags json pojo parser > + */ > [email protected] > +public class JsonParser extends Parser<String> > +{ > + > + private transient ObjectReader reader; > + protected String dateFormat; > + > + @Override > + public void activate(Context context) > + { > + try { > + ObjectMapper mapper = new ObjectMapper(); > + > mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, > false); > + if (dateFormat != null) { > + mapper.setDateFormat(new SimpleDateFormat(dateFormat)); > + } > + reader = mapper.reader(clazz); > + } catch (Throwable e) { > + throw new RuntimeException("Unable find provided class"); > + } > + } > + > + @Override > + public void deactivate() > + { > + } > + > + @Override > + public Object convert(String tuple) > + { > + try { > + return reader.readValue(tuple); > + } catch (JsonProcessingException e) { > + logger.debug("Error while converting tuple {} > {}",tuple,e.getMessage()); > + } catch (IOException e) { > + DTThrowable.rethrow(e); > + } > + return null; > + } > + > + /** > + * Get the date format > + * > + * @return Date format string > + */ > + public String getDateFormat() > + { > + return dateFormat; > + } > + > + /** > + * Set the date format > + * > + * @param dateFormat > + */ > + public void setDateFormat(String dateFormat) > + { > + this.dateFormat = dateFormat; > + } > + > + private static final Logger logger = > LoggerFactory.getLogger(JsonParser.class); > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java > b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java > new file mode 100644 > index 0000000..e5ff7f5 > --- /dev/null > +++ > b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java > @@ -0,0 +1,102 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.schema.parser; > + > +import org.apache.hadoop.classification.InterfaceStability; > + > +import com.datatorrent.api.Context; > +import com.datatorrent.api.Context.PortContext; > +import com.datatorrent.api.DefaultInputPort; > +import com.datatorrent.api.DefaultOutputPort; > +import com.datatorrent.api.Operator.ActivationListener; > +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; > +import com.datatorrent.common.util.BaseOperator; > +import com.datatorrent.contrib.converter.Converter; > + > +/** > + * Abstract class that implements Converter interface. This is a schema > enabled > + * Parser <br> > + * Sub classes need to implement the convert method <br> > + * <br> > + * <b>Port Interface</b><br> > + * <b>in</b>: expects <INPUT><br> > + * <b>out</b>: emits <Object> this is a schema enabled port<br> > + * <b>err</b>: emits <INPUT> error port that emits input tuple that > could > + * not be converted<br> > + * <br> > + * > + * @displayName Parser > + * @tags parser converter > + * @param <INPUT> > + */ > [email protected] > +public abstract class Parser<INPUT> extends BaseOperator implements > Converter<INPUT, Object>, > + ActivationListener<Context> > +{ > + protected transient Class<?> clazz; > + > + @OutputPortFieldAnnotation(schemaRequired = true) > + public transient DefaultOutputPort<Object> out = new > DefaultOutputPort<Object>() > + { > + public void setup(PortContext context) > + { > + clazz = context.getValue(Context.PortContext.TUPLE_CLASS); > + } > + }; > + > + @OutputPortFieldAnnotation(optional = true) > + public transient DefaultOutputPort<INPUT> err = new > DefaultOutputPort<INPUT>(); > + > + public transient DefaultInputPort<INPUT> in = new > DefaultInputPort<INPUT>() > + { > + @Override > + public void process(INPUT inputTuple) > + { > + Object tuple = convert(inputTuple); > + if (tuple == null && err.isConnected()) { > + err.emit(inputTuple); > + return; > + } > + if (out.isConnected()) { > + out.emit(tuple); > + } > + } > + }; > + > + /** > + * Get the class that needs to be formatted > + * > + * @return Class<?> > + */ > + public Class<?> getClazz() > + { > + return clazz; > + } > + > + /** > + * Set the class of tuple that needs to be formatted > + * > + * @param clazz > + */ > + public void setClazz(Class<?> clazz) > + { > + this.clazz = clazz; > + } > + > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java > b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java > new file mode 100644 > index 0000000..4931497 > --- /dev/null > +++ > b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java > @@ -0,0 +1,141 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.schema.parser; > + > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > + > +import org.apache.hadoop.classification.InterfaceStability; > + > +import com.thoughtworks.xstream.XStream; > +import com.thoughtworks.xstream.XStreamException; > +import com.thoughtworks.xstream.converters.basic.DateConverter; > + > +import com.datatorrent.api.Context; > + > +/** > + * Operator that converts XML string to Pojo <br> > + * <b>Properties</b> <br> > + * <b>alias</b>:This maps to the root element of the XML string. If not > + * specified, parser would expect the root element to be fully qualified > name of > + * the Pojo Class. <br> > + * <b>dateFormats</b>: Comma separated string of date formats e.g > + * dd/mm/yyyy,dd-mmm-yyyy where first one would be considered default > + * > + * @displayName XmlParser > + * @category Parsers > + * @tags xml pojo parser > + */ > [email protected] > +public class XmlParser extends Parser<String> > +{ > + > + private transient XStream xstream; > + protected String alias; > + protected String dateFormats; > + > + public XmlParser() > + { > + alias = null; > + dateFormats = null; > + } > + > + @Override > + public void activate(Context context) > + { > + xstream = new XStream(); > + if (alias != null) { > + try { > + xstream.alias(alias, clazz); > + } catch (Throwable e) { > + throw new RuntimeException("Unable find provided class"); > + } > + } > + if (dateFormats != null) { > + String[] dateFormat = dateFormats.split(","); > + xstream.registerConverter(new DateConverter(dateFormat[0], > dateFormat)); > + } > + } > + > + @Override > + public void deactivate() > + { > + > + } > + > + @Override > + public Object convert(String tuple) > + { > + try { > + return xstream.fromXML(tuple); > + } catch (XStreamException e) { > + logger.debug("Error while converting tuple {} {}", > tuple,e.getMessage()); > + return null; > + } > + } > + > + /** > + * Gets the alias > + * > + * @return alias. > + */ > + public String getAlias() > + { > + return alias; > + } > + > + /** > + * Sets the alias This maps to the root element of the XML string. If > not > + * specified, parser would expect the root element to be fully > qualified name > + * of the Pojo Class. > + * > + * @param alias > + * . > + */ > + public void setAlias(String alias) > + { > + this.alias = alias; > + } > + > + /** > + * Gets the comma separated string of date formats e.g > dd/mm/yyyy,dd-mmm-yyyy > + * where first one would be considered default > + * > + * @return dateFormats. > + */ > + public String getDateFormats() > + { > + return dateFormats; > + } > + > + /** > + * Sets the comma separated string of date formats e.g > dd/mm/yyyy,dd-mmm-yyyy > + * where first one would be considered default > + * > + * @param dateFormats > + * . > + */ > + public void setDateFormats(String dateFormats) > + { > + this.dateFormats = dateFormats; > + } > + > + private static final Logger logger = > LoggerFactory.getLogger(XmlParser.class); > + > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java > b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java > new file mode 100644 > index 0000000..8ecc088 > --- /dev/null > +++ > b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java > @@ -0,0 +1,147 @@ > +package com.datatorrent.contrib.schema.formatter; > + > +import java.util.Date; > + > +import org.joda.time.DateTime; > +import org.junit.Assert; > +import org.junit.Rule; > +import org.junit.Test; > +import org.junit.rules.TestWatcher; > +import org.junit.runner.Description; > + > +import com.datatorrent.contrib.schema.formatter.CsvFormatter; > +import com.datatorrent.lib.testbench.CollectorTestSink; > +import com.datatorrent.lib.util.TestUtils; > + > +public class CsvFormatterTest > +{ > + > + CsvFormatter operator; > + CollectorTestSink<Object> validDataSink; > + CollectorTestSink<String> invalidDataSink; > + > + @Rule > + public Watcher watcher = new Watcher(); > + > + public class Watcher extends TestWatcher > + { > + > + @Override > + protected void starting(Description description) > + { > + super.starting(description); > + operator = new CsvFormatter(); > + > operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); > + operator.setLineDelimiter("\r\n"); > + validDataSink = new CollectorTestSink<Object>(); > + invalidDataSink = new CollectorTestSink<String>(); > + TestUtils.setSink(operator.out, validDataSink); > + TestUtils.setSink(operator.err, invalidDataSink); > + } > + > + @Override > + protected void finished(Description description) > + { > + super.finished(description); > + operator.teardown(); > + } > + > + } > + > + @Test > + public void testPojoReaderToCsv() > + { > + operator.setup(null); > + EmployeeBean emp = new EmployeeBean(); > + emp.setName("john"); > + emp.setDept("cs"); > + emp.setEid(1); > + emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); > + operator.in.process(emp); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + String csvOp = (String)validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(csvOp); > + Assert.assertEquals("john,cs,1,01/01/2015" + > operator.getLineDelimiter(), csvOp); > + } > + > + @Test > + public void testPojoReaderToCsvMultipleDate() > + { > + > operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date,dateOfBirth:date|dd-MMM-yyyy"); > + operator.setup(null); > + EmployeeBean emp = new EmployeeBean(); > + emp.setName("john"); > + emp.setDept("cs"); > + emp.setEid(1); > + emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); > + emp.setDateOfBirth(new DateTime().withDate(2015, 1, 1).toDate()); > + operator.in.process(emp); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + String csvOp = (String)validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(csvOp); > + Assert.assertEquals("john,cs,1,01/01/2015,01-Jan-2015" + > operator.getLineDelimiter(), csvOp); > + } > + > + public static class EmployeeBean > + { > + > + private String name; > + private String dept; > + private int eid; > + private Date dateOfJoining; > + private Date dateOfBirth; > + > + public String getName() > + { > + return name; > + } > + > + public void setName(String name) > + { > + this.name = name; > + } > + > + public String getDept() > + { > + return dept; > + } > + > + public void setDept(String dept) > + { > + this.dept = dept; > + } > + > + public int getEid() > + { > + return eid; > + } > + > + public void setEid(int eid) > + { > + this.eid = eid; > + } > + > + public Date getDateOfJoining() > + { > + return dateOfJoining; > + } > + > + public void setDateOfJoining(Date dateOfJoining) > + { > + this.dateOfJoining = dateOfJoining; > + } > + > + public Date getDateOfBirth() > + { > + return dateOfBirth; > + } > + > + public void setDateOfBirth(Date dateOfBirth) > + { > + this.dateOfBirth = dateOfBirth; > + } > + } > + > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java > b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java > new file mode 100644 > index 0000000..4040c63 > --- /dev/null > +++ > b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java > @@ -0,0 +1,186 @@ > +package com.datatorrent.contrib.schema.formatter; > + > +import java.io.ByteArrayOutputStream; > +import java.io.File; > +import java.io.IOException; > +import java.io.PrintStream; > +import java.util.Date; > +import java.util.List; > + > +import org.apache.commons.io.FileUtils; > +import org.joda.time.DateTime; > +import org.junit.Assert; > +import org.junit.Rule; > +import org.junit.Test; > +import org.junit.runner.Description; > + > +import > com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; > +import com.datatorrent.lib.testbench.CollectorTestSink; > +import com.datatorrent.lib.util.TestUtils; > +import com.datatorrent.lib.util.TestUtils.TestInfo; > +import com.google.common.collect.Lists; > + > +public class JsonFormatterTest > +{ > + JsonFormatter operator; > + CollectorTestSink<Object> validDataSink; > + CollectorTestSink<String> invalidDataSink; > + > + final ByteArrayOutputStream myOut = new ByteArrayOutputStream(); > + > + public JsonFormatterTest() > + { > + // So that the output is cleaner. > + System.setErr(new PrintStream(myOut)); > + } > + > + @Rule > + public TestInfo testMeta = new FSTestWatcher() > + { > + private void deleteDirectory() > + { > + try { > + FileUtils.deleteDirectory(new File(getDir())); > + } catch (IOException ex) { > + throw new RuntimeException(ex); > + } > + } > + > + @Override > + protected void starting(Description descriptor) > + { > + super.starting(descriptor); > + deleteDirectory(); > + > + operator = new JsonFormatter(); > + > + validDataSink = new CollectorTestSink<Object>(); > + invalidDataSink = new CollectorTestSink<String>(); > + TestUtils.setSink(operator.out, validDataSink); > + TestUtils.setSink(operator.err, invalidDataSink); > + operator.setup(null); > + operator.activate(null); > + > + operator.beginWindow(0); > + } > + > + @Override > + protected void finished(Description description) > + { > + operator.endWindow(); > + operator.teardown(); > + > + deleteDirectory(); > + super.finished(description); > + } > + }; > + > + @Test > + public void testJSONToPOJO() > + { > + Test1Pojo pojo = new Test1Pojo(); > + pojo.a = 123; > + pojo.b = 234876274; > + pojo.c = "HowAreYou?"; > + pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); > + > + operator.in.put(pojo); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + String expectedJSONString = > "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":null}"; > + Assert.assertEquals(expectedJSONString, > validDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testJSONToPOJODate() > + { > + Test1Pojo pojo = new Test1Pojo(); > + pojo.a = 123; > + pojo.b = 234876274; > + pojo.c = "HowAreYou?"; > + pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); > + pojo.date = new > DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate(); > + operator.setDateFormat("dd-MM-yyyy"); > + operator.setup(null); > + operator.activate(null); > + operator.in.put(pojo); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + String expectedJSONString = > "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; > + Assert.assertEquals(expectedJSONString, > validDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testJSONToPOJONullFields() > + { > + Test1Pojo pojo = new Test1Pojo(); > + pojo.a = 123; > + pojo.b = 234876274; > + pojo.c = "HowAreYou?"; > + pojo.d = null; > + > + operator.in.put(pojo); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + String expectedJSONString = > "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":null,\"date\":null}"; > + Assert.assertEquals(expectedJSONString, > validDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testJSONToPOJOEmptyPOJO() > + { > + Test1Pojo pojo = new Test1Pojo(); > + operator.in.put(pojo); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + String expectedJSONString = > "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}"; > + System.out.println(validDataSink.collectedTuples.get(0)); > + Assert.assertEquals(expectedJSONString, > validDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testJSONToPOJONullPOJO() > + { > + operator.in.put(null); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + String expectedJSONString = "null"; > + Assert.assertEquals(expectedJSONString, > validDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testJSONToPOJONoFieldPOJO() > + { > + operator.endWindow(); > + operator.teardown(); > + operator.setClazz(Test2Pojo.class); > + operator.setup(null); > + operator.beginWindow(1); > + > + Test2Pojo o = new Test2Pojo(); > + operator.in.put(o); > + Assert.assertEquals(0, validDataSink.collectedTuples.size()); > + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); > + Assert.assertEquals(o, invalidDataSink.collectedTuples.get(0)); > + } > + > + public static class Test1Pojo > + { > + public int a; > + public long b; > + public String c; > + public List<String> d; > + public Date date; > + > + @Override > + public String toString() > + { > + return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + > ", date=" + date + "]"; > + } > + } > + > + public static class Test2Pojo > + { > + } > + > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java > b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java > new file mode 100644 > index 0000000..2bc1aec > --- /dev/null > +++ > b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java > @@ -0,0 +1,226 @@ > +package com.datatorrent.contrib.schema.formatter; > + > +import java.util.Date; > + > +import org.joda.time.DateTime; > +import org.junit.Assert; > +import org.junit.Rule; > +import org.junit.Test; > +import org.junit.rules.TestWatcher; > +import org.junit.runner.Description; > + > +import com.datatorrent.contrib.schema.formatter.XmlFormatter; > +import com.datatorrent.lib.testbench.CollectorTestSink; > +import com.datatorrent.lib.util.TestUtils; > + > +public class XmlFormatterTest > +{ > + > + XmlFormatter operator; > + CollectorTestSink<Object> validDataSink; > + CollectorTestSink<String> invalidDataSink; > + > + @Rule > + public Watcher watcher = new Watcher(); > + > + public class Watcher extends TestWatcher > + { > + > + @Override > + protected void starting(Description description) > + { > + super.starting(description); > + operator = new XmlFormatter(); > + operator.setClazz(EmployeeBean.class); > + operator.setDateFormat("yyyy-MM-dd"); > + validDataSink = new CollectorTestSink<Object>(); > + invalidDataSink = new CollectorTestSink<String>(); > + TestUtils.setSink(operator.out, validDataSink); > + TestUtils.setSink(operator.err, invalidDataSink); > + } > + > + @Override > + protected void finished(Description description) > + { > + super.finished(description); > + operator.teardown(); > + } > + > + } > + > + @Test > + public void testPojoToXmlWithoutAlias() > + { > + EmployeeBean e = new EmployeeBean(); > + e.setName("john"); > + e.setEid(1); > + e.setDept("cs"); > + e.setDateOfJoining(new > DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); > + > + operator.setup(null); > + operator.activate(null); > + operator.in.process(e); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + String expected = > "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" > + "<name>john</name>" > + + "<dept>cs</dept>" + "<eid>1</eid>" + > "<dateOfJoining>2015-01-01</dateOfJoining>" > + + > "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"; > + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testXmlToPojoWithAlias() > + { > + EmployeeBean e = new EmployeeBean(); > + e.setName("john"); > + e.setEid(1); > + e.setDept("cs"); > + e.setDateOfJoining(new > DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); > + > + operator.setAlias("EmployeeBean"); > + operator.setup(null); > + operator.activate(null); > + operator.in.process(e); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + String expected = "<EmployeeBean>" + "<name>john</name>" + > "<dept>cs</dept>" + "<eid>1</eid>" > + + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>"; > + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testXmlToPojoWithPrettyPrint() > + { > + EmployeeBean e = new EmployeeBean(); > + e.setName("john"); > + e.setEid(1); > + e.setDept("cs"); > + e.setDateOfJoining(new > DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); > + > + operator.setAlias("EmployeeBean"); > + operator.setPrettyPrint(true); > + operator.setup(null); > + operator.activate(null); > + operator.in.process(e); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + String expected = "<EmployeeBean>\n" + " <name>john</name>\n" + " > <dept>cs</dept>\n" + " <eid>1</eid>\n" > + + " <dateOfJoining>2015-01-01</dateOfJoining>\n" + > "</EmployeeBean>"; > + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testPojoToXmlWithoutAliasHeirarchical() > + { > + EmployeeBean e = new EmployeeBean(); > + e.setName("john"); > + e.setEid(1); > + e.setDept("cs"); > + e.setDateOfJoining(new > DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); > + Address address = new Address(); > + address.setCity("new york"); > + address.setCountry("US"); > + e.setAddress(address); > + > + operator.setup(null); > + operator.activate(null); > + operator.in.process(e); > + System.out.println(validDataSink.collectedTuples.get(0)); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + String expected = > "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" > + "<name>john</name>" > + + "<dept>cs</dept>" + "<eid>1</eid>" + > "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>" > + + "<city>new york</city>" + "<country>US</country>" + "</address>" > + + > "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"; > + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); > + } > + > + public static class EmployeeBean > + { > + > + private String name; > + private String dept; > + private int eid; > + private Date dateOfJoining; > + private Address address; > + > + public String getName() > + { > + return name; > + } > + > + public void setName(String name) > + { > + this.name = name; > + } > + > + public String getDept() > + { > + return dept; > + } > + > + public void setDept(String dept) > + { > + this.dept = dept; > + } > + > + public int getEid() > + { > + return eid; > + } > + > + public void setEid(int eid) > + { > + this.eid = eid; > + } > + > + public Date getDateOfJoining() > + { > + return dateOfJoining; > + } > + > + public void setDateOfJoining(Date dateOfJoining) > + { > + this.dateOfJoining = dateOfJoining; > + } > + > + public Address getAddress() > + { > + return address; > + } > + > + public void setAddress(Address address) > + { > + this.address = address; > + } > + } > + > + public static class Address > + { > + > + private String city; > + private String country; > + > + public String getCity() > + { > + return city; > + } > + > + public void setCity(String city) > + { > + this.city = city; > + } > + > + public String getCountry() > + { > + return country; > + } > + > + public void setCountry(String country) > + { > + this.country = country; > + } > + > + } > + > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java > b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java > new file mode 100644 > index 0000000..3c31ad0 > --- /dev/null > +++ > b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java > @@ -0,0 +1,172 @@ > +package com.datatorrent.contrib.schema.parser; > + > +import java.util.Date; > + > +import org.joda.time.DateTime; > +import org.junit.Assert; > +import org.junit.Rule; > +import org.junit.Test; > +import org.junit.rules.TestWatcher; > +import org.junit.runner.Description; > + > +import com.datatorrent.contrib.schema.parser.CsvParser; > +import com.datatorrent.lib.testbench.CollectorTestSink; > +import com.datatorrent.lib.util.TestUtils; > + > +public class CsvParserTest > +{ > + > + CsvParser operator; > + CollectorTestSink<Object> validDataSink; > + CollectorTestSink<String> invalidDataSink; > + > + @Rule > + public Watcher watcher = new Watcher(); > + > + public class Watcher extends TestWatcher > + { > + > + @Override > + protected void starting(Description description) > + { > + super.starting(description); > + operator = new CsvParser(); > + operator.setClazz(EmployeeBean.class); > + > operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); > + validDataSink = new CollectorTestSink<Object>(); > + invalidDataSink = new CollectorTestSink<String>(); > + TestUtils.setSink(operator.out, validDataSink); > + TestUtils.setSink(operator.err, invalidDataSink); > + } > + > + @Override > + protected void finished(Description description) > + { > + super.finished(description); > + operator.teardown(); > + } > + > + } > + > + @Test > + public void testCsvToPojoWriterDefault() > + { > + operator.setup(null); > + String tuple = "john,cs,1,01/01/2015"; > + operator.in.process(tuple); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + Object obj = validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(obj); > + Assert.assertEquals(EmployeeBean.class, obj.getClass()); > + EmployeeBean pojo = (EmployeeBean)obj; > + Assert.assertEquals("john", pojo.getName()); > + Assert.assertEquals("cs", pojo.getDept()); > + Assert.assertEquals(1, pojo.getEid()); > + Assert.assertEquals(new DateTime().withDate(2015, 1, > 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( > + pojo.getDateOfJoining())); > + } > + > + @Test > + public void testCsvToPojoWriterDateFormat() > + { > + > operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy"); > + operator.setup(null); > + String tuple = "john,cs,1,01-JAN-2015"; > + operator.in.process(tuple); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + Object obj = validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(obj); > + Assert.assertEquals(EmployeeBean.class, obj.getClass()); > + EmployeeBean pojo = (EmployeeBean)obj; > + Assert.assertEquals("john", pojo.getName()); > + Assert.assertEquals("cs", pojo.getDept()); > + Assert.assertEquals(1, pojo.getEid()); > + Assert.assertEquals(new DateTime().withDate(2015, 1, > 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( > + pojo.getDateOfJoining())); > + } > + > + @Test > + public void testCsvToPojoWriterDateFormatMultiple() > + { > + > operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date"); > + operator.setup(null); > + String tuple = "john,cs,1,01-JAN-2015,01/01/2015"; > + operator.in.process(tuple); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + Object obj = validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(obj); > + Assert.assertEquals(EmployeeBean.class, obj.getClass()); > + EmployeeBean pojo = (EmployeeBean)obj; > + Assert.assertEquals("john", pojo.getName()); > + Assert.assertEquals("cs", pojo.getDept()); > + Assert.assertEquals(1, pojo.getEid()); > + Assert.assertEquals(new DateTime().withDate(2015, 1, > 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( > + pojo.getDateOfJoining())); > + Assert.assertEquals(new DateTime().withDate(2015, 1, > 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( > + pojo.getDateOfBirth())); > + } > + > + public static class EmployeeBean > + { > + > + private String name; > + private String dept; > + private int eid; > + private Date dateOfJoining; > + private Date dateOfBirth; > + > + public String getName() > + { > + return name; > + } > + > + public void setName(String name) > + { > + this.name = name; > + } > + > + public String getDept() > + { > + return dept; > + } > + > + public void setDept(String dept) > + { > + this.dept = dept; > + } > + > + public int getEid() > + { > + return eid; > + } > + > + public void setEid(int eid) > + { > + this.eid = eid; > + } > + > + public Date getDateOfJoining() > + { > + return dateOfJoining; > + } > + > + public void setDateOfJoining(Date dateOfJoining) > + { > + this.dateOfJoining = dateOfJoining; > + } > + > + public Date getDateOfBirth() > + { > + return dateOfBirth; > + } > + > + public void setDateOfBirth(Date dateOfBirth) > + { > + this.dateOfBirth = dateOfBirth; > + } > + } > + > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java > b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java > new file mode 100644 > index 0000000..b453508 > --- /dev/null > +++ > b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java > @@ -0,0 +1,212 @@ > +package com.datatorrent.contrib.schema.parser; > + > +import java.io.ByteArrayOutputStream; > +import java.io.File; > +import java.io.IOException; > +import java.io.PrintStream; > +import java.util.Date; > +import java.util.List; > + > +import org.apache.commons.io.FileUtils; > +import org.joda.time.DateTime; > +import org.junit.Assert; > +import org.junit.Rule; > +import org.junit.Test; > +import org.junit.runner.Description; > + > +import > com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; > +import com.datatorrent.lib.testbench.CollectorTestSink; > +import com.datatorrent.lib.util.TestUtils; > +import com.datatorrent.lib.util.TestUtils.TestInfo; > + > +public class JsonParserTest > +{ > + JsonParser operator; > + CollectorTestSink<Object> validDataSink; > + CollectorTestSink<String> invalidDataSink; > + > + final ByteArrayOutputStream myOut = new ByteArrayOutputStream(); > + > + public JsonParserTest() > + { > + // So that the output is cleaner. > + System.setErr(new PrintStream(myOut)); > + } > + > + @Rule > + public TestInfo testMeta = new FSTestWatcher() > + { > + private void deleteDirectory() > + { > + try { > + FileUtils.deleteDirectory(new File(getDir())); > + } catch (IOException ex) { > + throw new RuntimeException(ex); > + } > + } > + > + @Override > + protected void starting(Description descriptor) > + { > + > + super.starting(descriptor); > + deleteDirectory(); > + > + operator = new JsonParser(); > + operator.setClazz(Test1Pojo.class); > + validDataSink = new CollectorTestSink<Object>(); > + invalidDataSink = new CollectorTestSink<String>(); > + TestUtils.setSink(operator.out, validDataSink); > + TestUtils.setSink(operator.err, invalidDataSink); > + operator.setup(null); > + operator.activate(null); > + > + operator.beginWindow(0); > + } > + > + @Override > + protected void finished(Description description) > + { > + operator.endWindow(); > + operator.teardown(); > + > + deleteDirectory(); > + super.finished(description); > + } > + }; > + > + @Test > + public void testJSONToPOJO() > + { > + String tuple = > "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; > + operator.in.put(tuple); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + Object obj = validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(obj); > + Assert.assertEquals(Test1Pojo.class, obj.getClass()); > + Test1Pojo pojo = (Test1Pojo)obj; > + Assert.assertEquals(123, pojo.a); > + Assert.assertEquals(234876274, pojo.b); > + Assert.assertEquals("HowAreYou?", pojo.c); > + Assert.assertEquals(3, pojo.d.size()); > + Assert.assertEquals("ABC", pojo.d.get(0)); > + Assert.assertEquals("PQR", pojo.d.get(1)); > + Assert.assertEquals("XYZ", pojo.d.get(2)); > + } > + > + @Test > + public void testJSONToPOJODate() > + { > + String tuple = > "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; > + operator.setDateFormat("dd-MM-yyyy"); > + operator.setup(null); > + operator.activate(null); > + operator.in.put(tuple); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + Object obj = validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(obj); > + Assert.assertEquals(Test1Pojo.class, obj.getClass()); > + Test1Pojo pojo = (Test1Pojo)obj; > + Assert.assertEquals(123, pojo.a); > + Assert.assertEquals(234876274, pojo.b); > + Assert.assertEquals("HowAreYou?", pojo.c); > + Assert.assertEquals(3, pojo.d.size()); > + Assert.assertEquals("ABC", pojo.d.get(0)); > + Assert.assertEquals("PQR", pojo.d.get(1)); > + Assert.assertEquals("XYZ", pojo.d.get(2)); > + Assert.assertEquals(2015, new DateTime(pojo.date).getYear()); > + Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear()); > + Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth()); > + } > + > + @Test > + public void testJSONToPOJOInvalidData() > + { > + String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}"; > + operator.in.put(tuple); > + Assert.assertEquals(0, validDataSink.collectedTuples.size()); > + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); > + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testJSONToPOJOUnknownFields() > + { > + String tuple = > "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}"; > + operator.in.put(tuple); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + Object obj = validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(obj); > + Assert.assertEquals(Test1Pojo.class, obj.getClass()); > + Test1Pojo pojo = (Test1Pojo)obj; > + Assert.assertEquals(123, pojo.a); > + Assert.assertEquals(234876274, pojo.b); > + Assert.assertEquals("HowAreYou?", pojo.c); > + Assert.assertEquals(null, pojo.d); > + } > + > + @Test > + public void testJSONToPOJOMismatchingFields() > + { > + String tuple = > "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; > + operator.in.put(tuple); > + Assert.assertEquals(0, validDataSink.collectedTuples.size()); > + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); > + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testJSONToPOJOEmptyString() > + { > + String tuple = ""; > + operator.in.put(tuple); > + Assert.assertEquals(0, validDataSink.collectedTuples.size()); > + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); > + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testJSONToPOJOEmptyJSON() > + { > + String tuple = "{}"; > + operator.in.put(tuple); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + Object obj = validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(obj); > + Assert.assertEquals(Test1Pojo.class, obj.getClass()); > + Test1Pojo pojo = (Test1Pojo)obj; > + Assert.assertEquals(0, pojo.a); > + Assert.assertEquals(0, pojo.b); > + Assert.assertEquals(null, pojo.c); > + Assert.assertEquals(null, pojo.d); > + } > + > + @Test > + public void testJSONToPOJOArrayInJson() > + { > + String tuple = > "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; > + operator.in.put(tuple); > + Assert.assertEquals(0, validDataSink.collectedTuples.size()); > + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); > + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); > + } > + > + public static class Test1Pojo > + { > + public int a; > + public long b; > + public String c; > + public List<String> d; > + public Date date; > + > + @Override > + public String toString() > + { > + return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + > ", date=" + date + "]"; > + } > + } > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java > ---------------------------------------------------------------------- > diff --git > a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java > b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java > new file mode 100644 > index 0000000..4298951 > --- /dev/null > +++ > b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java > @@ -0,0 +1,254 @@ > +package com.datatorrent.contrib.schema.parser; > + > +import java.util.Date; > + > +import org.joda.time.DateTime; > +import org.junit.Assert; > +import org.junit.Rule; > +import org.junit.Test; > +import org.junit.rules.TestWatcher; > +import org.junit.runner.Description; > + > +import com.datatorrent.lib.testbench.CollectorTestSink; > +import com.datatorrent.lib.util.TestUtils; > + > +public class XmlParserTest > +{ > + XmlParser operator; > + CollectorTestSink<Object> validDataSink; > + CollectorTestSink<String> invalidDataSink; > + > + @Rule > + public Watcher watcher = new Watcher(); > + > + public class Watcher extends TestWatcher > + { > + > + @Override > + protected void starting(Description description) > + { > + super.starting(description); > + operator = new XmlParser(); > + operator.setClazz(EmployeeBean.class); > + operator.setDateFormats("yyyy-MM-dd"); //setting default date > pattern > + validDataSink = new CollectorTestSink<Object>(); > + invalidDataSink = new CollectorTestSink<String>(); > + TestUtils.setSink(operator.out, validDataSink); > + TestUtils.setSink(operator.err, invalidDataSink); > + } > + > + @Override > + protected void finished(Description description) > + { > + super.finished(description); > + operator.teardown(); > + } > + > + } > + > + @Test > + public void testXmlToPojoWithoutAlias() > + { > + String tuple = > "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + > "<name>john</name>" > + + "<dept>cs</dept>" + "<eid>1</eid>" + > "<dateOfJoining>2015-01-01</dateOfJoining>" > + + > "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>"; > + > + operator.setup(null); > + operator.activate(null); > + operator.in.process(tuple); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + Object obj = validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(obj); > + Assert.assertEquals(EmployeeBean.class, obj.getClass()); > + EmployeeBean pojo = (EmployeeBean)obj; > + Assert.assertEquals("john", pojo.getName()); > + Assert.assertEquals("cs", pojo.getDept()); > + Assert.assertEquals(1, pojo.getEid()); > + Assert.assertEquals(2015, new > DateTime(pojo.getDateOfJoining()).getYear()); > + Assert.assertEquals(1, new > DateTime(pojo.getDateOfJoining()).getMonthOfYear()); > + Assert.assertEquals(1, new > DateTime(pojo.getDateOfJoining()).getDayOfMonth()); > + } > + > + @Test > + public void testXmlToPojoWithAliasDateFormat() > + { > + String tuple = "<EmployeeBean>" + "<name>john</name>" + > "<dept>cs</dept>" + "<eid>1</eid>" > + + "<dateOfJoining>2015-JAN-01</dateOfJoining>" + > "</EmployeeBean>"; > + > + operator.setAlias("EmployeeBean"); > + operator.setDateFormats("yyyy-MM-dd,yyyy-MMM-dd"); > + operator.setup(null); > + operator.activate(null); > + operator.in.process(tuple); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + Object obj = validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(obj); > + Assert.assertEquals(EmployeeBean.class, obj.getClass()); > + EmployeeBean pojo = (EmployeeBean)obj; > + Assert.assertEquals("john", pojo.getName()); > + Assert.assertEquals("cs", pojo.getDept()); > + Assert.assertEquals(1, pojo.getEid()); > + Assert.assertEquals(2015, new > DateTime(pojo.getDateOfJoining()).getYear()); > + Assert.assertEquals(1, new > DateTime(pojo.getDateOfJoining()).getMonthOfYear()); > + Assert.assertEquals(1, new > DateTime(pojo.getDateOfJoining()).getDayOfMonth()); > + } > + > + @Test > + public void testXmlToPojoWithAlias() > + { > + String tuple = "<EmployeeBean>" + "<name>john</name>" + > "<dept>cs</dept>" + "<eid>1</eid>" > + + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>"; > + > + operator.setAlias("EmployeeBean"); > + operator.setup(null); > + operator.activate(null); > + operator.in.process(tuple); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + Object obj = validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(obj); > + Assert.assertEquals(EmployeeBean.class, obj.getClass()); > + EmployeeBean pojo = (EmployeeBean)obj; > + Assert.assertEquals("john", pojo.getName()); > + Assert.assertEquals("cs", pojo.getDept()); > + Assert.assertEquals(1, pojo.getEid()); > + Assert.assertEquals(2015, new > DateTime(pojo.getDateOfJoining()).getYear()); > + Assert.assertEquals(1, new > DateTime(pojo.getDateOfJoining()).getMonthOfYear()); > + Assert.assertEquals(1, new > DateTime(pojo.getDateOfJoining()).getDayOfMonth()); > + } > + > + @Test > + public void testXmlToPojoIncorrectXML() > + { > + String tuple = "<EmployeeBean>" > + + "<firstname>john</firstname>" //incorrect field name > + + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01 > 00:00:00.00 IST</dateOfJoining>" > + + "</EmployeeBean>"; > + > + operator.setAlias("EmployeeBean"); > + operator.setup(null); > + operator.activate(null); > + operator.in.process(tuple); > + Assert.assertEquals(0, validDataSink.collectedTuples.size()); > + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); > + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); > + } > + > + @Test > + public void testXmlToPojoWithoutAliasHeirarchical() > + { > + String tuple = > "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + > "<name>john</name>" > + + "<dept>cs</dept>" + "<eid>1</eid>" + > "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>" > + + "<city>new york</city>" + "<country>US</country>" + "</address>" > + + > "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>"; > + > + operator.setup(null); > + operator.activate(null); > + operator.in.process(tuple); > + Assert.assertEquals(1, validDataSink.collectedTuples.size()); > + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); > + Object obj = validDataSink.collectedTuples.get(0); > + Assert.assertNotNull(obj); > + Assert.assertEquals(EmployeeBean.class, obj.getClass()); > + EmployeeBean pojo = (EmployeeBean)obj; > + Assert.assertEquals("john", pojo.getName()); > + Assert.assertEquals("cs", pojo.getDept()); > + Assert.assertEquals(1, pojo.getEid()); > + Assert.assertEquals(Address.class, pojo.getAddress().getClass()); > + Assert.assertEquals("new york", pojo.getAddress().getCity()); > + Assert.assertEquals("US", pojo.getAddress().getCountry()); > + Assert.assertEquals(2015, new > DateTime(pojo.getDateOfJoining()).getYear()); > + Assert.assertEquals(1, new > DateTime(pojo.getDateOfJoining()).getMonthOfYear()); > + Assert.assertEquals(1, new > DateTime(pojo.getDateOfJoining()).getDayOfMonth()); > + } > + > + public static class EmployeeBean > + { > + > + private String name; > + private String dept; > + private int eid; > + private Date dateOfJoining; > + private Address address; > + > + public String getName() > + { > + return name; > + } > + > + public void setName(String name) > + { > + this.name = name; > + } > + > + public String getDept() > + { > + return dept; > + } > + > + public void setDept(String dept) > + { > + this.dept = dept; > + } > + > + public int getEid() > + { > + return eid; > + } > + > + public void setEid(int eid) > + { > + this.eid = eid; > + } > + > + public Date getDateOfJoining() > + { > + return dateOfJoining; > + } > + > + public void setDateOfJoining(Date dateOfJoining) > + { > + this.dateOfJoining = dateOfJoining; > + } > + > + public Address getAddress() > + { > + return address; > + } > + > + public void setAddress(Address address) > + { > + this.address = address; > + } > + } > + > + public static class Address > + { > + > + private String city; > + private String country; > + > + public String getCity() > + { > + return city; > + } > + > + public void setCity(String city) > + { > + this.city = city; > + } > + > + public String getCountry() > + { > + return country; > + } > + > + public void setCountry(String country) > + { > + this.country = country; > + } > + } > + > +} > >
