Moving XML and JSON parsers & formatters to Malhar-lib and changing package names for parsers & formatters
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/23dabc41 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/23dabc41 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/23dabc41 Branch: refs/heads/master Commit: 23dabc414b9df2d3b0739933406ad11c2388f0b6 Parents: f070c33 Author: ishark <[email protected]> Authored: Tue Dec 15 14:52:18 2015 -0800 Committer: ishark <[email protected]> Committed: Tue Dec 15 14:52:18 2015 -0800 ---------------------------------------------------------------------- contrib/pom.xml | 6 - .../contrib/converter/Converter.java | 44 --- .../contrib/formatter/CsvFormatter.java | 286 +++++++++++++++++ .../datatorrent/contrib/parser/CsvParser.java | 315 +++++++++++++++++++ .../contrib/schema/formatter/CsvFormatter.java | 286 ----------------- .../contrib/schema/formatter/Formatter.java | 102 ------ .../contrib/schema/formatter/JsonFormatter.java | 110 ------- .../contrib/schema/formatter/XmlFormatter.java | 173 ---------- .../contrib/schema/parser/CsvParser.java | 315 ------------------- .../contrib/schema/parser/JsonParser.java | 110 ------- .../contrib/schema/parser/Parser.java | 103 ------ .../contrib/schema/parser/XmlParser.java | 142 --------- .../contrib/formatter/CsvFormatterTest.java | 165 ++++++++++ .../contrib/parser/CsvPOJOParserTest.java | 189 +++++++++++ .../schema/formatter/CsvFormatterTest.java | 165 ---------- .../schema/formatter/JsonFormatterTest.java | 204 ------------ .../schema/formatter/XmlFormatterTest.java | 244 -------------- .../contrib/schema/parser/CsvParserTest.java | 190 ----------- .../contrib/schema/parser/JsonParserTest.java | 230 -------------- .../contrib/schema/parser/XmlParserTest.java | 272 ---------------- library/pom.xml | 12 + .../datatorrent/lib/converter/Converter.java | 44 +++ .../datatorrent/lib/formatter/Formatter.java | 102 ++++++ .../lib/formatter/JsonFormatter.java | 110 +++++++ .../datatorrent/lib/formatter/XmlFormatter.java | 173 ++++++++++ .../com/datatorrent/lib/parser/JsonParser.java | 110 +++++++ .../java/com/datatorrent/lib/parser/Parser.java | 103 ++++++ .../com/datatorrent/lib/parser/XmlParser.java | 142 +++++++++ .../lib/formatter/JsonFormatterTest.java | 204 ++++++++++++ .../lib/formatter/XmlFormatterTest.java | 243 ++++++++++++++ .../datatorrent/lib/parser/JsonParserTest.java | 230 ++++++++++++++ .../datatorrent/lib/parser/XmlParserTest.java | 272 ++++++++++++++++ pom.xml | 2 + 33 files changed, 2702 insertions(+), 2696 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index f1b6ceb..256b438 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -612,12 +612,6 @@ <type>jar</type> </dependency> <dependency> - <!-- required by Xml parser and formatter --> - <groupId>com.thoughtworks.xstream</groupId> - <artifactId>xstream</artifactId> - <version>1.4.8</version> - </dependency> - <dependency> <!-- required by Csv parser and formatter --> <groupId>net.sf.supercsv</groupId> <artifactId>super-csv-joda</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java deleted file mode 100644 index 601268d..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.converter; - -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Operators that are converting tuples from one format to another must - * implement this interface. Eg. Parsers or formatters , that parse data of - * certain format and convert them to another format. - * - * @param <INPUT> - * @param <OUTPUT> - * @since 3.2.0 - */ [email protected] -public interface Converter<INPUT, OUTPUT> -{ - /** - * Provide the implementation for converting tuples from one format to the - * other - * - * @param INPUT - * tuple of certain format - * @return OUTPUT tuple of converted format - */ - public OUTPUT convert(INPUT tuple); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java new file mode 100644 index 0000000..e5bbd3c --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.formatter; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.supercsv.cellprocessor.FmtDate; +import org.supercsv.cellprocessor.Optional; +import org.supercsv.cellprocessor.ift.CellProcessor; +import org.supercsv.exception.SuperCsvException; +import org.supercsv.io.CsvBeanWriter; +import org.supercsv.io.ICsvBeanWriter; +import org.supercsv.prefs.CsvPreference; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.formatter.Formatter; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Operator that converts POJO to CSV string <br> + * Assumption is that each field in the delimited data should map to a simple + * java type.<br> + * <br> + * <b>Properties</b> <br> + * <b>fieldInfo</b>:User need to specify fields and their types as a comma + * separated string having format <NAME>:<TYPE>|<FORMAT> in + * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as + * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * + * @displayName CsvFormatter + * @category Formatter + * @tags pojo csv formatter + * @since 3.2.0 + */ [email protected] +public class CsvFormatter extends Formatter<String> +{ + + private ArrayList<Field> fields; + @NotNull + protected String classname; + @NotNull + protected int fieldDelimiter; + protected String lineDelimiter; + + @NotNull + protected String fieldInfo; + + public enum FIELD_TYPE + { + BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE + }; + + protected transient String[] nameMapping; + protected transient CellProcessor[] processors; + protected transient CsvPreference preference; + + public CsvFormatter() + { + fields = new ArrayList<Field>(); + fieldDelimiter = ','; + lineDelimiter = "\r\n"; + + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + + //fieldInfo information + fields = new ArrayList<Field>(); + String[] fieldInfoTuple = fieldInfo.split(","); + for (int i = 0; i < fieldInfoTuple.length; i++) { + String[] fieldTuple = fieldInfoTuple[i].split(":"); + Field field = new Field(); + field.setName(fieldTuple[0]); + String[] typeFormat = fieldTuple[1].split("\\|"); + field.setType(typeFormat[0].toUpperCase()); + if (typeFormat.length > 1) { + field.setFormat(typeFormat[1]); + } + getFields().add(field); + } + preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); + int countKeyValue = getFields().size(); + nameMapping = new String[countKeyValue]; + processors = new CellProcessor[countKeyValue]; + initialise(nameMapping, processors); + + } + + private void initialise(String[] nameMapping, CellProcessor[] processors) + { + for (int i = 0; i < getFields().size(); i++) { + FIELD_TYPE type = getFields().get(i).type; + nameMapping[i] = getFields().get(i).name; + if (type == FIELD_TYPE.DATE) { + String dateFormat = getFields().get(i).format; + processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat)); + } else { + processors[i] = new Optional(); + } + } + + } + + @Override + public void activate(Context context) + { + + } + + @Override + public void deactivate() + { + + } + + @Override + public String convert(Object tuple) + { + try { + StringWriter stringWriter = new StringWriter(); + ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference); + beanWriter.write(tuple, nameMapping, processors); + beanWriter.flush(); + beanWriter.close(); + return stringWriter.toString(); + } catch (SuperCsvException e) { + logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + return null; + } + + public static class Field + { + String name; + String format; + FIELD_TYPE type; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public FIELD_TYPE getType() + { + return type; + } + + public void setType(String type) + { + this.type = FIELD_TYPE.valueOf(type); + } + + public String getFormat() + { + return format; + } + + public void setFormat(String format) + { + this.format = format; + } + } + + /** + * Gets the array list of the fields, a field being a POJO containing the name + * of the field and type of field. + * + * @return An array list of Fields. + */ + public ArrayList<Field> getFields() + { + return fields; + } + + /** + * Sets the array list of the fields, a field being a POJO containing the name + * of the field and type of field. + * + * @param fields + * An array list of Fields. + */ + public void setFields(ArrayList<Field> fields) + { + this.fields = fields; + } + + /** + * Gets the delimiter which separates fields in incoming data. + * + * @return fieldDelimiter + */ + public int getFieldDelimiter() + { + return fieldDelimiter; + } + + /** + * Sets the delimiter which separates fields in incoming data. + * + * @param fieldDelimiter + */ + public void setFieldDelimiter(int fieldDelimiter) + { + this.fieldDelimiter = fieldDelimiter; + } + + /** + * Gets the delimiter which separates lines in incoming data. + * + * @return lineDelimiter + */ + public String getLineDelimiter() + { + return lineDelimiter; + } + + /** + * Sets the delimiter which separates line in incoming data. + * + * @param lineDelimiter + */ + public void setLineDelimiter(String lineDelimiter) + { + this.lineDelimiter = lineDelimiter; + } + + /** + * Gets the name of the fields with type and format in data as comma separated + * string in same order as incoming data. e.g + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * + * @return fieldInfo + */ + public String getFieldInfo() + { + return fieldInfo; + } + + /** + * Sets the name of the fields with type and format in data as comma separated + * string in same order as incoming data. e.g + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * + * @param fieldInfo + */ + public void setFieldInfo(String fieldInfo) + { + this.fieldInfo = fieldInfo; + } + + private static final Logger logger = LoggerFactory.getLogger(CsvFormatter.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java new file mode 100644 index 0000000..0c5f8d2 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import java.io.IOException; +import java.util.ArrayList; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.supercsv.cellprocessor.Optional; +import org.supercsv.cellprocessor.ParseBool; +import org.supercsv.cellprocessor.ParseChar; +import org.supercsv.cellprocessor.ParseDate; +import org.supercsv.cellprocessor.ParseDouble; +import org.supercsv.cellprocessor.ParseInt; +import org.supercsv.cellprocessor.ParseLong; +import org.supercsv.cellprocessor.ift.CellProcessor; +import org.supercsv.io.CsvBeanReader; +import org.supercsv.prefs.CsvPreference; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.parser.Parser; +import com.datatorrent.lib.util.ReusableStringReader; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Operator that converts CSV string to Pojo <br> + * Assumption is that each field in the delimited data should map to a simple + * java type.<br> + * <br> + * <b>Properties</b> <br> + * <b>fieldInfo</b>:User need to specify fields and their types as a comma + * separated string having format <NAME>:<TYPE>|<FORMAT> in + * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as + * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br> + * <b>fieldDelimiter</b>: Default is comma <br> + * <b>lineDelimiter</b>: Default is '\r\n' + * + * @displayName CsvParser + * @category Parsers + * @tags csv pojo parser + * @since 3.2.0 + */ [email protected] +public class CsvParser extends Parser<String> +{ + + private ArrayList<Field> fields; + @NotNull + protected int fieldDelimiter; + protected String lineDelimiter; + + @NotNull + protected String fieldInfo; + + protected transient String[] nameMapping; + protected transient CellProcessor[] processors; + private transient CsvBeanReader csvReader; + + public enum FIELD_TYPE + { + BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE + }; + + @NotNull + private transient ReusableStringReader csvStringReader = new ReusableStringReader(); + + public CsvParser() + { + fields = new ArrayList<Field>(); + fieldDelimiter = ','; + lineDelimiter = "\r\n"; + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + + logger.info("field info {}", fieldInfo); + fields = new ArrayList<Field>(); + String[] fieldInfoTuple = fieldInfo.split(","); + for (int i = 0; i < fieldInfoTuple.length; i++) { + String[] fieldTuple = fieldInfoTuple[i].split(":"); + Field field = new Field(); + field.setName(fieldTuple[0]); + String[] typeFormat = fieldTuple[1].split("\\|"); + field.setType(typeFormat[0].toUpperCase()); + if (typeFormat.length > 1) { + field.setFormat(typeFormat[1]); + } + getFields().add(field); + } + + CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); + csvReader = new CsvBeanReader(csvStringReader, preference); + int countKeyValue = getFields().size(); + logger.info("countKeyValue {}", countKeyValue); + nameMapping = new String[countKeyValue]; + processors = new CellProcessor[countKeyValue]; + initialise(nameMapping, processors); + } + + private void initialise(String[] nameMapping, CellProcessor[] processors) + { + for (int i = 0; i < getFields().size(); i++) { + FIELD_TYPE type = getFields().get(i).type; + nameMapping[i] = getFields().get(i).name; + if (type == FIELD_TYPE.DOUBLE) { + processors[i] = new Optional(new ParseDouble()); + } else if (type == FIELD_TYPE.INTEGER) { + processors[i] = new Optional(new ParseInt()); + } else if (type == FIELD_TYPE.FLOAT) { + processors[i] = new Optional(new ParseDouble()); + } else if (type == FIELD_TYPE.LONG) { + processors[i] = new Optional(new ParseLong()); + } else if (type == FIELD_TYPE.SHORT) { + processors[i] = new Optional(new ParseInt()); + } else if (type == FIELD_TYPE.STRING) { + processors[i] = new Optional(); + } else if (type == FIELD_TYPE.CHARACTER) { + processors[i] = new Optional(new ParseChar()); + } else if (type == FIELD_TYPE.BOOLEAN) { + processors[i] = new Optional(new ParseBool()); + } else if (type == FIELD_TYPE.DATE) { + String dateFormat = getFields().get(i).format; + processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat)); + } + } + } + + @Override + public void activate(Context context) + { + + } + + @Override + public void deactivate() + { + + } + + @Override + public Object convert(String tuple) + { + try { + csvStringReader.open(tuple); + return csvReader.read(clazz, nameMapping, processors); + } catch (IOException e) { + logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); + return null; + } + } + + @Override + public void teardown() + { + try { + if (csvReader != null) { + csvReader.close(); + } + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + + public static class Field + { + String name; + String format; + FIELD_TYPE type; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public FIELD_TYPE getType() + { + return type; + } + + public void setType(String type) + { + this.type = FIELD_TYPE.valueOf(type); + } + + public String getFormat() + { + return format; + } + + public void setFormat(String format) + { + this.format = format; + } + + } + + /** + * Gets the array list of the fields, a field being a POJO containing the name + * of the field and type of field. + * + * @return An array list of Fields. + */ + public ArrayList<Field> getFields() + { + return fields; + } + + /** + * Sets the array list of the fields, a field being a POJO containing the name + * of the field and type of field. + * + * @param fields + * An array list of Fields. + */ + public void setFields(ArrayList<Field> fields) + { + this.fields = fields; + } + + /** + * Gets the delimiter which separates fields in incoming data. + * + * @return fieldDelimiter + */ + public int getFieldDelimiter() + { + return fieldDelimiter; + } + + /** + * Sets the delimiter which separates fields in incoming data. + * + * @param fieldDelimiter + */ + public void setFieldDelimiter(int fieldDelimiter) + { + this.fieldDelimiter = fieldDelimiter; + } + + /** + * Gets the delimiter which separates lines in incoming data. + * + * @return lineDelimiter + */ + public String getLineDelimiter() + { + return lineDelimiter; + } + + /** + * Sets the delimiter which separates line in incoming data. + * + * @param lineDelimiter + */ + public void setLineDelimiter(String lineDelimiter) + { + this.lineDelimiter = lineDelimiter; + } + + /** + * Gets the name of the fields with type and format ( for date ) as comma + * separated string in same order as incoming data. e.g + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * + * @return fieldInfo + */ + public String getFieldInfo() + { + return fieldInfo; + } + + /** + * Sets the name of the fields with type and format ( for date ) as comma + * separated string in same order as incoming data. e.g + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * + * @param fieldInfo + */ + public void setFieldInfo(String fieldInfo) + { + this.fieldInfo = fieldInfo; + } + + private static final Logger logger = LoggerFactory.getLogger(CsvParser.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java deleted file mode 100644 index 490c4f2..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.formatter; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.ArrayList; - -import javax.validation.constraints.NotNull; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.supercsv.cellprocessor.FmtDate; -import org.supercsv.cellprocessor.Optional; -import org.supercsv.cellprocessor.ift.CellProcessor; -import org.supercsv.exception.SuperCsvException; -import org.supercsv.io.CsvBeanWriter; -import org.supercsv.io.ICsvBeanWriter; -import org.supercsv.prefs.CsvPreference; - -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.DTThrowable; - -/** - * Operator that converts POJO to CSV string <br> - * Assumption is that each field in the delimited data should map to a simple - * java type.<br> - * <br> - * <b>Properties</b> <br> - * <b>fieldInfo</b>:User need to specify fields and their types as a comma - * separated string having format <NAME>:<TYPE>|<FORMAT> in - * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as - * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy - * - * @displayName CsvFormatter - * @category Formatter - * @tags pojo csv formatter - * @since 3.2.0 - */ [email protected] -public class CsvFormatter extends Formatter<String> -{ - - private ArrayList<Field> fields; - @NotNull - protected String classname; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; - - @NotNull - protected String fieldInfo; - - public enum FIELD_TYPE - { - BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; - - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - protected transient CsvPreference preference; - - public CsvFormatter() - { - fields = new ArrayList<Field>(); - fieldDelimiter = ','; - lineDelimiter = "\r\n"; - - } - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - - //fieldInfo information - fields = new ArrayList<Field>(); - String[] fieldInfoTuple = fieldInfo.split(","); - for (int i = 0; i < fieldInfoTuple.length; i++) { - String[] fieldTuple = fieldInfoTuple[i].split(":"); - Field field = new Field(); - field.setName(fieldTuple[0]); - String[] typeFormat = fieldTuple[1].split("\\|"); - field.setType(typeFormat[0].toUpperCase()); - if (typeFormat.length > 1) { - field.setFormat(typeFormat[1]); - } - getFields().add(field); - } - preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); - int countKeyValue = getFields().size(); - nameMapping = new String[countKeyValue]; - processors = new CellProcessor[countKeyValue]; - initialise(nameMapping, processors); - - } - - private void initialise(String[] nameMapping, CellProcessor[] processors) - { - for (int i = 0; i < getFields().size(); i++) { - FIELD_TYPE type = getFields().get(i).type; - nameMapping[i] = getFields().get(i).name; - if (type == FIELD_TYPE.DATE) { - String dateFormat = getFields().get(i).format; - processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat)); - } else { - processors[i] = new Optional(); - } - } - - } - - @Override - public void activate(Context context) - { - - } - - @Override - public void deactivate() - { - - } - - @Override - public String convert(Object tuple) - { - try { - StringWriter stringWriter = new StringWriter(); - ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference); - beanWriter.write(tuple, nameMapping, processors); - beanWriter.flush(); - beanWriter.close(); - return stringWriter.toString(); - } catch (SuperCsvException e) { - logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); - } catch (IOException e) { - DTThrowable.rethrow(e); - } - return null; - } - - public static class Field - { - String name; - String format; - FIELD_TYPE type; - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public FIELD_TYPE getType() - { - return type; - } - - public void setType(String type) - { - this.type = FIELD_TYPE.valueOf(type); - } - - public String getFormat() - { - return format; - } - - public void setFormat(String format) - { - this.format = format; - } - } - - /** - * Gets the array list of the fields, a field being a POJO containing the name - * of the field and type of field. - * - * @return An array list of Fields. - */ - public ArrayList<Field> getFields() - { - return fields; - } - - /** - * Sets the array list of the fields, a field being a POJO containing the name - * of the field and type of field. - * - * @param fields - * An array list of Fields. - */ - public void setFields(ArrayList<Field> fields) - { - this.fields = fields; - } - - /** - * Gets the delimiter which separates fields in incoming data. - * - * @return fieldDelimiter - */ - public int getFieldDelimiter() - { - return fieldDelimiter; - } - - /** - * Sets the delimiter which separates fields in incoming data. - * - * @param fieldDelimiter - */ - public void setFieldDelimiter(int fieldDelimiter) - { - this.fieldDelimiter = fieldDelimiter; - } - - /** - * Gets the delimiter which separates lines in incoming data. - * - * @return lineDelimiter - */ - public String getLineDelimiter() - { - return lineDelimiter; - } - - /** - * Sets the delimiter which separates line in incoming data. - * - * @param lineDelimiter - */ - public void setLineDelimiter(String lineDelimiter) - { - this.lineDelimiter = lineDelimiter; - } - - /** - * Gets the name of the fields with type and format in data as comma separated - * string in same order as incoming data. e.g - * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy - * - * @return fieldInfo - */ - public String getFieldInfo() - { - return fieldInfo; - } - - /** - * Sets the name of the fields with type and format in data as comma separated - * string in same order as incoming data. e.g - * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy - * - * @param fieldInfo - */ - public void setFieldInfo(String fieldInfo) - { - this.fieldInfo = fieldInfo; - } - - private static final Logger logger = LoggerFactory.getLogger(CsvFormatter.class); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java deleted file mode 100644 index 77fa630..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.formatter; - -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.api.Context; -import com.datatorrent.api.Context.PortContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator.ActivationListener; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.contrib.converter.Converter; - -/** - * Abstract class that implements Converter interface. This is a schema enabled - * Formatter <br> - * Sub classes need to implement the convert method <br> - * <b>Port Interface</b><br> - * <b>in</b>: expects <Object> this is a schema enabled port<br> - * <b>out</b>: emits <OUTPUT> <br> - * <b>err</b>: emits <Object> error port that emits input tuple that could - * not be converted<br> - * <br> - * - * @displayName Parser - * @tags parser converter - * @param <INPUT> - * @since 3.2.0 - */ [email protected] -public abstract class Formatter<OUTPUT> extends BaseOperator implements Converter<Object, OUTPUT>, - ActivationListener<Context> -{ - protected transient Class<?> clazz; - - @OutputPortFieldAnnotation - public transient DefaultOutputPort<OUTPUT> out = new DefaultOutputPort<OUTPUT>(); - - @OutputPortFieldAnnotation(optional = true) - public transient DefaultOutputPort<Object> err = new DefaultOutputPort<Object>(); - - @InputPortFieldAnnotation(schemaRequired = true) - public transient DefaultInputPort<Object> in = new DefaultInputPort<Object>() - { - public void setup(PortContext context) - { - clazz = context.getValue(Context.PortContext.TUPLE_CLASS); - } - - @Override - public void process(Object inputTuple) - { - OUTPUT tuple = convert(inputTuple); - if (tuple == null && err.isConnected()) { - err.emit(inputTuple); - return; - } - if (out.isConnected()) { - out.emit(tuple); - } - } - }; - - /** - * Get the class that needs to be formatted - * - * @return Class<?> - */ - public Class<?> getClazz() - { - return clazz; - } - - /** - * Set the class of tuple that needs to be formatted - * - * @param clazz - */ - public void setClazz(Class<?> clazz) - { - this.clazz = clazz; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java deleted file mode 100644 index 5f7bce6..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.formatter; - -import java.io.IOException; -import java.text.SimpleDateFormat; - -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.map.JsonMappingException; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectWriter; -import org.codehaus.jackson.map.SerializationConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.DTThrowable; - -/** - * Operator that converts POJO to JSON string <br> - * <b>Properties</b> <br> - * <b>dateFormat</b>: date format e.g dd/MM/yyyy - * - * @displayName JsonFormatter - * @category Formatter - * @tags pojo json formatter - * @since 3.2.0 - */ [email protected] -public class JsonFormatter extends Formatter<String> -{ - private transient ObjectWriter writer; - protected String dateFormat; - - @Override - public void activate(Context context) - { - try { - ObjectMapper mapper = new ObjectMapper(); - if (dateFormat != null) { - mapper.setDateFormat(new SimpleDateFormat(dateFormat)); - } - writer = mapper.writerWithType(clazz); - mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true); - mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true); - mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, true); - } catch (Throwable e) { - throw new RuntimeException("Unable find provided class"); - } - } - - @Override - public void deactivate() - { - - } - - @Override - public String convert(Object tuple) - { - try { - return writer.writeValueAsString(tuple); - } catch (JsonGenerationException | JsonMappingException e) { - logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); - } catch (IOException e) { - DTThrowable.rethrow(e); - } - return null; - } - - /** - * Get the date format - * - * @return Date format string - */ - public String getDateFormat() - { - return dateFormat; - } - - /** - * Set the date format - * - * @param dateFormat - */ - public void setDateFormat(String dateFormat) - { - this.dateFormat = dateFormat; - } - - private static final Logger logger = LoggerFactory.getLogger(JsonFormatter.class); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java deleted file mode 100644 index 40fef69..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.formatter; - -import java.io.Writer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.api.Context; - -import com.thoughtworks.xstream.XStream; -import com.thoughtworks.xstream.XStreamException; -import com.thoughtworks.xstream.converters.basic.DateConverter; -import com.thoughtworks.xstream.io.HierarchicalStreamWriter; -import com.thoughtworks.xstream.io.xml.CompactWriter; -import com.thoughtworks.xstream.io.xml.XppDriver; - -/** - * @displayName XmlParser - * @category Formatter - * @tags xml pojo formatter - * @since 3.2.0 - */ [email protected] -public class XmlFormatter extends Formatter<String> -{ - - private transient XStream xstream; - - protected String alias; - protected String dateFormat; - protected boolean prettyPrint; - - public XmlFormatter() - { - alias = null; - dateFormat = null; - } - - @Override - public void activate(Context context) - { - if (prettyPrint) { - xstream = new XStream(); - } else { - xstream = new XStream(new XppDriver() - { - @Override - public HierarchicalStreamWriter createWriter(Writer out) - { - return new CompactWriter(out, getNameCoder()); - } - }); - } - if (alias != null) { - try { - xstream.alias(alias, clazz); - } catch (Throwable e) { - throw new RuntimeException("Unable find provided class"); - } - } - if (dateFormat != null) { - xstream.registerConverter(new DateConverter(dateFormat, new String[] {})); - } - } - - @Override - public void deactivate() - { - - } - - @Override - public String convert(Object tuple) - { - try { - return xstream.toXML(tuple); - } catch (XStreamException e) { - logger.debug("Error while converting tuple {} {} ",tuple,e.getMessage()); - return null; - } - } - - /** - * Gets the alias This is an optional step. Without it XStream would work - * fine, but the XML element names would contain the fully qualified name of - * each class (including package) which would bulk up the XML a bit. - * - * @return alias. - */ - public String getAlias() - { - return alias; - } - - /** - * Sets the alias This is an optional step. Without it XStream would work - * fine, but the XML element names would contain the fully qualified name of - * each class (including package) which would bulk up the XML a bit. - * - * @param alias - * . - */ - public void setAlias(String alias) - { - this.alias = alias; - } - - /** - * Gets the date format e.g dd/mm/yyyy - this will be how a date would be - * formatted - * - * @return dateFormat. - */ - public String getDateFormat() - { - return dateFormat; - } - - /** - * Sets the date format e.g dd/mm/yyyy - this will be how a date would be - * formatted - * - * @param dateFormat - * . - */ - public void setDateFormat(String dateFormat) - { - this.dateFormat = dateFormat; - } - - /** - * Returns true if pretty print is enabled. - * - * @return prettyPrint - */ - public boolean isPrettyPrint() - { - return prettyPrint; - } - - /** - * Sets pretty print option. - * - * @param prettyPrint - */ - public void setPrettyPrint(boolean prettyPrint) - { - this.prettyPrint = prettyPrint; - } - - private static final Logger logger = LoggerFactory.getLogger(XmlFormatter.class); - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java deleted file mode 100644 index 991f6eb..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java +++ /dev/null @@ -1,315 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.parser; - -import java.io.IOException; -import java.util.ArrayList; - -import javax.validation.constraints.NotNull; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.supercsv.cellprocessor.Optional; -import org.supercsv.cellprocessor.ParseBool; -import org.supercsv.cellprocessor.ParseChar; -import org.supercsv.cellprocessor.ParseDate; -import org.supercsv.cellprocessor.ParseDouble; -import org.supercsv.cellprocessor.ParseInt; -import org.supercsv.cellprocessor.ParseLong; -import org.supercsv.cellprocessor.ift.CellProcessor; -import org.supercsv.io.CsvBeanReader; -import org.supercsv.prefs.CsvPreference; - -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.api.Context; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.lib.util.ReusableStringReader; -import com.datatorrent.netlet.util.DTThrowable; - -/** - * Operator that converts CSV string to Pojo <br> - * Assumption is that each field in the delimited data should map to a simple - * java type.<br> - * <br> - * <b>Properties</b> <br> - * <b>fieldInfo</b>:User need to specify fields and their types as a comma - * separated string having format <NAME>:<TYPE>|<FORMAT> in - * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as - * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br> - * <b>fieldDelimiter</b>: Default is comma <br> - * <b>lineDelimiter</b>: Default is '\r\n' - * - * @displayName CsvParser - * @category Parsers - * @tags csv pojo parser - * @since 3.2.0 - */ [email protected] -public class CsvParser extends Parser<String> -{ - - private ArrayList<Field> fields; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; - - @NotNull - protected String fieldInfo; - - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - private transient CsvBeanReader csvReader; - - public enum FIELD_TYPE - { - BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; - - @NotNull - private transient ReusableStringReader csvStringReader = new ReusableStringReader(); - - public CsvParser() - { - fields = new ArrayList<Field>(); - fieldDelimiter = ','; - lineDelimiter = "\r\n"; - } - - @Override - public void setup(OperatorContext context) - { - super.setup(context); - - logger.info("field info {}", fieldInfo); - fields = new ArrayList<Field>(); - String[] fieldInfoTuple = fieldInfo.split(","); - for (int i = 0; i < fieldInfoTuple.length; i++) { - String[] fieldTuple = fieldInfoTuple[i].split(":"); - Field field = new Field(); - field.setName(fieldTuple[0]); - String[] typeFormat = fieldTuple[1].split("\\|"); - field.setType(typeFormat[0].toUpperCase()); - if (typeFormat.length > 1) { - field.setFormat(typeFormat[1]); - } - getFields().add(field); - } - - CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); - csvReader = new CsvBeanReader(csvStringReader, preference); - int countKeyValue = getFields().size(); - logger.info("countKeyValue {}", countKeyValue); - nameMapping = new String[countKeyValue]; - processors = new CellProcessor[countKeyValue]; - initialise(nameMapping, processors); - } - - private void initialise(String[] nameMapping, CellProcessor[] processors) - { - for (int i = 0; i < getFields().size(); i++) { - FIELD_TYPE type = getFields().get(i).type; - nameMapping[i] = getFields().get(i).name; - if (type == FIELD_TYPE.DOUBLE) { - processors[i] = new Optional(new ParseDouble()); - } else if (type == FIELD_TYPE.INTEGER) { - processors[i] = new Optional(new ParseInt()); - } else if (type == FIELD_TYPE.FLOAT) { - processors[i] = new Optional(new ParseDouble()); - } else if (type == FIELD_TYPE.LONG) { - processors[i] = new Optional(new ParseLong()); - } else if (type == FIELD_TYPE.SHORT) { - processors[i] = new Optional(new ParseInt()); - } else if (type == FIELD_TYPE.STRING) { - processors[i] = new Optional(); - } else if (type == FIELD_TYPE.CHARACTER) { - processors[i] = new Optional(new ParseChar()); - } else if (type == FIELD_TYPE.BOOLEAN) { - processors[i] = new Optional(new ParseBool()); - } else if (type == FIELD_TYPE.DATE) { - String dateFormat = getFields().get(i).format; - processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat)); - } - } - } - - @Override - public void activate(Context context) - { - - } - - @Override - public void deactivate() - { - - } - - @Override - public Object convert(String tuple) - { - try { - csvStringReader.open(tuple); - return csvReader.read(clazz, nameMapping, processors); - } catch (IOException e) { - logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); - return null; - } - } - - @Override - public void teardown() - { - try { - if (csvReader != null) { - csvReader.close(); - } - } catch (IOException e) { - DTThrowable.rethrow(e); - } - } - - public static class Field - { - String name; - String format; - FIELD_TYPE type; - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public FIELD_TYPE getType() - { - return type; - } - - public void setType(String type) - { - this.type = FIELD_TYPE.valueOf(type); - } - - public String getFormat() - { - return format; - } - - public void setFormat(String format) - { - this.format = format; - } - - } - - /** - * Gets the array list of the fields, a field being a POJO containing the name - * of the field and type of field. - * - * @return An array list of Fields. - */ - public ArrayList<Field> getFields() - { - return fields; - } - - /** - * Sets the array list of the fields, a field being a POJO containing the name - * of the field and type of field. - * - * @param fields - * An array list of Fields. - */ - public void setFields(ArrayList<Field> fields) - { - this.fields = fields; - } - - /** - * Gets the delimiter which separates fields in incoming data. - * - * @return fieldDelimiter - */ - public int getFieldDelimiter() - { - return fieldDelimiter; - } - - /** - * Sets the delimiter which separates fields in incoming data. - * - * @param fieldDelimiter - */ - public void setFieldDelimiter(int fieldDelimiter) - { - this.fieldDelimiter = fieldDelimiter; - } - - /** - * Gets the delimiter which separates lines in incoming data. - * - * @return lineDelimiter - */ - public String getLineDelimiter() - { - return lineDelimiter; - } - - /** - * Sets the delimiter which separates line in incoming data. - * - * @param lineDelimiter - */ - public void setLineDelimiter(String lineDelimiter) - { - this.lineDelimiter = lineDelimiter; - } - - /** - * Gets the name of the fields with type and format ( for date ) as comma - * separated string in same order as incoming data. e.g - * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy - * - * @return fieldInfo - */ - public String getFieldInfo() - { - return fieldInfo; - } - - /** - * Sets the name of the fields with type and format ( for date ) as comma - * separated string in same order as incoming data. e.g - * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy - * - * @param fieldInfo - */ - public void setFieldInfo(String fieldInfo) - { - this.fieldInfo = fieldInfo; - } - - private static final Logger logger = LoggerFactory.getLogger(CsvParser.class); - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java deleted file mode 100644 index 513be15..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.parser; - -import java.io.IOException; -import java.text.SimpleDateFormat; - -import org.codehaus.jackson.JsonProcessingException; -import org.codehaus.jackson.map.DeserializationConfig; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.DTThrowable; - -/** - * Operator that converts JSON string to Pojo <br> - * <b>Properties</b> <br> - * <b>dateFormat</b>: date format e.g dd/MM/yyyy - * - * @displayName JsonParser - * @category Parsers - * @tags json pojo parser - * @since 3.2.0 - */ [email protected] -public class JsonParser extends Parser<String> -{ - - private transient ObjectReader reader; - protected String dateFormat; - - @Override - public void activate(Context context) - { - try { - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); - if (dateFormat != null) { - mapper.setDateFormat(new SimpleDateFormat(dateFormat)); - } - reader = mapper.reader(clazz); - } catch (Throwable e) { - throw new RuntimeException("Unable find provided class"); - } - } - - @Override - public void deactivate() - { - } - - @Override - public Object convert(String tuple) - { - try { - if (!StringUtils.isEmpty(tuple)) { - return reader.readValue(tuple); - } - } catch (JsonProcessingException e) { - logger.debug("Error while converting tuple {} {}", tuple, e.getMessage()); - } catch (IOException e) { - DTThrowable.rethrow(e); - } - return null; - } - - /** - * Get the date format - * - * @return Date format string - */ - public String getDateFormat() - { - return dateFormat; - } - - /** - * Set the date format - * - * @param dateFormat - */ - public void setDateFormat(String dateFormat) - { - this.dateFormat = dateFormat; - } - - private static final Logger logger = LoggerFactory.getLogger(JsonParser.class); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java deleted file mode 100644 index 3c1df8f..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.parser; - -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.api.Context; -import com.datatorrent.api.Context.PortContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator.ActivationListener; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.contrib.converter.Converter; - -/** - * Abstract class that implements Converter interface. This is a schema enabled - * Parser <br> - * Sub classes need to implement the convert method <br> - * <br> - * <b>Port Interface</b><br> - * <b>in</b>: expects <INPUT><br> - * <b>out</b>: emits <Object> this is a schema enabled port<br> - * <b>err</b>: emits <INPUT> error port that emits input tuple that could - * not be converted<br> - * <br> - * - * @displayName Parser - * @tags parser converter - * @param <INPUT> - * @since 3.2.0 - */ [email protected] -public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>, - ActivationListener<Context> -{ - protected transient Class<?> clazz; - - @OutputPortFieldAnnotation(schemaRequired = true) - public transient DefaultOutputPort<Object> out = new DefaultOutputPort<Object>() - { - public void setup(PortContext context) - { - clazz = context.getValue(Context.PortContext.TUPLE_CLASS); - } - }; - - @OutputPortFieldAnnotation(optional = true) - public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>(); - - public transient DefaultInputPort<INPUT> in = new DefaultInputPort<INPUT>() - { - @Override - public void process(INPUT inputTuple) - { - Object tuple = convert(inputTuple); - if (tuple == null && err.isConnected()) { - err.emit(inputTuple); - return; - } - if (out.isConnected()) { - out.emit(tuple); - } - } - }; - - /** - * Get the class that needs to be formatted - * - * @return Class<?> - */ - public Class<?> getClazz() - { - return clazz; - } - - /** - * Set the class of tuple that needs to be formatted - * - * @param clazz - */ - public void setClazz(Class<?> clazz) - { - this.clazz = clazz; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java deleted file mode 100644 index 9e1c8be..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.parser; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.classification.InterfaceStability; - -import com.thoughtworks.xstream.XStream; -import com.thoughtworks.xstream.XStreamException; -import com.thoughtworks.xstream.converters.basic.DateConverter; - -import com.datatorrent.api.Context; - -/** - * Operator that converts XML string to Pojo <br> - * <b>Properties</b> <br> - * <b>alias</b>:This maps to the root element of the XML string. If not - * specified, parser would expect the root element to be fully qualified name of - * the Pojo Class. <br> - * <b>dateFormats</b>: Comma separated string of date formats e.g - * dd/mm/yyyy,dd-mmm-yyyy where first one would be considered default - * - * @displayName XmlParser - * @category Parsers - * @tags xml pojo parser - * @since 3.2.0 - */ [email protected] -public class XmlParser extends Parser<String> -{ - - private transient XStream xstream; - protected String alias; - protected String dateFormats; - - public XmlParser() - { - alias = null; - dateFormats = null; - } - - @Override - public void activate(Context context) - { - xstream = new XStream(); - if (alias != null) { - try { - xstream.alias(alias, clazz); - } catch (Throwable e) { - throw new RuntimeException("Unable find provided class"); - } - } - if (dateFormats != null) { - String[] dateFormat = dateFormats.split(","); - xstream.registerConverter(new DateConverter(dateFormat[0], dateFormat)); - } - } - - @Override - public void deactivate() - { - - } - - @Override - public Object convert(String tuple) - { - try { - return xstream.fromXML(tuple); - } catch (XStreamException e) { - logger.debug("Error while converting tuple {} {}", tuple,e.getMessage()); - return null; - } - } - - /** - * Gets the alias - * - * @return alias. - */ - public String getAlias() - { - return alias; - } - - /** - * Sets the alias This maps to the root element of the XML string. If not - * specified, parser would expect the root element to be fully qualified name - * of the Pojo Class. - * - * @param alias - * . - */ - public void setAlias(String alias) - { - this.alias = alias; - } - - /** - * Gets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy - * where first one would be considered default - * - * @return dateFormats. - */ - public String getDateFormats() - { - return dateFormats; - } - - /** - * Sets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy - * where first one would be considered default - * - * @param dateFormats - * . - */ - public void setDateFormats(String dateFormats) - { - this.dateFormats = dateFormats; - } - - private static final Logger logger = LoggerFactory.getLogger(XmlParser.class); - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java new file mode 100644 index 0000000..13d9739 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.formatter; + +import java.util.Date; + +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.contrib.formatter.CsvFormatter; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class CsvFormatterTest +{ + + CsvFormatter operator; + CollectorTestSink<Object> validDataSink; + CollectorTestSink<String> invalidDataSink; + + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + + @Override + protected void starting(Description description) + { + super.starting(description); + operator = new CsvFormatter(); + operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); + operator.setLineDelimiter("\r\n"); + validDataSink = new CollectorTestSink<Object>(); + invalidDataSink = new CollectorTestSink<String>(); + TestUtils.setSink(operator.out, validDataSink); + TestUtils.setSink(operator.err, invalidDataSink); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + operator.teardown(); + } + + } + + @Test + public void testPojoReaderToCsv() + { + operator.setup(null); + EmployeeBean emp = new EmployeeBean(); + emp.setName("john"); + emp.setDept("cs"); + emp.setEid(1); + emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); + operator.in.process(emp); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String csvOp = (String)validDataSink.collectedTuples.get(0); + Assert.assertNotNull(csvOp); + Assert.assertEquals("john,cs,1,01/01/2015" + operator.getLineDelimiter(), csvOp); + } + + @Test + public void testPojoReaderToCsvMultipleDate() + { + operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date,dateOfBirth:date|dd-MMM-yyyy"); + operator.setup(null); + EmployeeBean emp = new EmployeeBean(); + emp.setName("john"); + emp.setDept("cs"); + emp.setEid(1); + emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); + emp.setDateOfBirth(new DateTime().withDate(2015, 1, 1).toDate()); + operator.in.process(emp); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String csvOp = (String)validDataSink.collectedTuples.get(0); + Assert.assertNotNull(csvOp); + Assert.assertEquals("john,cs,1,01/01/2015,01-Jan-2015" + operator.getLineDelimiter(), csvOp); + } + + public static class EmployeeBean + { + + private String name; + private String dept; + private int eid; + private Date dateOfJoining; + private Date dateOfBirth; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public String getDept() + { + return dept; + } + + public void setDept(String dept) + { + this.dept = dept; + } + + public int getEid() + { + return eid; + } + + public void setEid(int eid) + { + this.eid = eid; + } + + public Date getDateOfJoining() + { + return dateOfJoining; + } + + public void setDateOfJoining(Date dateOfJoining) + { + this.dateOfJoining = dateOfJoining; + } + + public Date getDateOfBirth() + { + return dateOfBirth; + } + + public void setDateOfBirth(Date dateOfBirth) + { + this.dateOfBirth = dateOfBirth; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java new file mode 100644 index 0000000..c9a4179 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import java.util.Date; + +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class CsvPOJOParserTest +{ + + CsvParser operator; + CollectorTestSink<Object> validDataSink; + CollectorTestSink<String> invalidDataSink; + + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + + @Override + protected void starting(Description description) + { + super.starting(description); + operator = new CsvParser(); + operator.setClazz(EmployeeBean.class); + operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); + validDataSink = new CollectorTestSink<Object>(); + invalidDataSink = new CollectorTestSink<String>(); + TestUtils.setSink(operator.out, validDataSink); + TestUtils.setSink(operator.err, invalidDataSink); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + operator.teardown(); + } + + } + + @Test + public void testCsvToPojoWriterDefault() + { + operator.setup(null); + String tuple = "john,cs,1,01/01/2015"; + operator.in.process(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(EmployeeBean.class, obj.getClass()); + EmployeeBean pojo = (EmployeeBean)obj; + Assert.assertEquals("john", pojo.getName()); + Assert.assertEquals("cs", pojo.getDept()); + Assert.assertEquals(1, pojo.getEid()); + Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( + pojo.getDateOfJoining())); + } + + @Test + public void testCsvToPojoWriterDateFormat() + { + operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy"); + operator.setup(null); + String tuple = "john,cs,1,01-JAN-2015"; + operator.in.process(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(EmployeeBean.class, obj.getClass()); + EmployeeBean pojo = (EmployeeBean)obj; + Assert.assertEquals("john", pojo.getName()); + Assert.assertEquals("cs", pojo.getDept()); + Assert.assertEquals(1, pojo.getEid()); + Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( + pojo.getDateOfJoining())); + } + + @Test + public void testCsvToPojoWriterDateFormatMultiple() + { + operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date"); + operator.setup(null); + String tuple = "john,cs,1,01-JAN-2015,01/01/2015"; + operator.in.process(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(EmployeeBean.class, obj.getClass()); + EmployeeBean pojo = (EmployeeBean)obj; + Assert.assertEquals("john", pojo.getName()); + Assert.assertEquals("cs", pojo.getDept()); + Assert.assertEquals(1, pojo.getEid()); + Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( + pojo.getDateOfJoining())); + Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( + pojo.getDateOfBirth())); + } + + public static class EmployeeBean + { + + private String name; + private String dept; + private int eid; + private Date dateOfJoining; + private Date dateOfBirth; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public String getDept() + { + return dept; + } + + public void setDept(String dept) + { + this.dept = dept; + } + + public int getEid() + { + return eid; + } + + public void setEid(int eid) + { + this.eid = eid; + } + + public Date getDateOfJoining() + { + return dateOfJoining; + } + + public void setDateOfJoining(Date dateOfJoining) + { + this.dateOfJoining = dateOfJoining; + } + + public Date getDateOfBirth() + { + return dateOfBirth; + } + + public void setDateOfBirth(Date dateOfBirth) + { + this.dateOfBirth = dateOfBirth; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java deleted file mode 100644 index 8183381..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.formatter; - -import java.util.Date; - -import org.joda.time.DateTime; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import com.datatorrent.contrib.schema.formatter.CsvFormatter; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; - -public class CsvFormatterTest -{ - - CsvFormatter operator; - CollectorTestSink<Object> validDataSink; - CollectorTestSink<String> invalidDataSink; - - @Rule - public Watcher watcher = new Watcher(); - - public class Watcher extends TestWatcher - { - - @Override - protected void starting(Description description) - { - super.starting(description); - operator = new CsvFormatter(); - operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); - operator.setLineDelimiter("\r\n"); - validDataSink = new CollectorTestSink<Object>(); - invalidDataSink = new CollectorTestSink<String>(); - TestUtils.setSink(operator.out, validDataSink); - TestUtils.setSink(operator.err, invalidDataSink); - } - - @Override - protected void finished(Description description) - { - super.finished(description); - operator.teardown(); - } - - } - - @Test - public void testPojoReaderToCsv() - { - operator.setup(null); - EmployeeBean emp = new EmployeeBean(); - emp.setName("john"); - emp.setDept("cs"); - emp.setEid(1); - emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); - operator.in.process(emp); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String csvOp = (String)validDataSink.collectedTuples.get(0); - Assert.assertNotNull(csvOp); - Assert.assertEquals("john,cs,1,01/01/2015" + operator.getLineDelimiter(), csvOp); - } - - @Test - public void testPojoReaderToCsvMultipleDate() - { - operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date,dateOfBirth:date|dd-MMM-yyyy"); - operator.setup(null); - EmployeeBean emp = new EmployeeBean(); - emp.setName("john"); - emp.setDept("cs"); - emp.setEid(1); - emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); - emp.setDateOfBirth(new DateTime().withDate(2015, 1, 1).toDate()); - operator.in.process(emp); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String csvOp = (String)validDataSink.collectedTuples.get(0); - Assert.assertNotNull(csvOp); - Assert.assertEquals("john,cs,1,01/01/2015,01-Jan-2015" + operator.getLineDelimiter(), csvOp); - } - - public static class EmployeeBean - { - - private String name; - private String dept; - private int eid; - private Date dateOfJoining; - private Date dateOfBirth; - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public String getDept() - { - return dept; - } - - public void setDept(String dept) - { - this.dept = dept; - } - - public int getEid() - { - return eid; - } - - public void setEid(int eid) - { - this.eid = eid; - } - - public Date getDateOfJoining() - { - return dateOfJoining; - } - - public void setDateOfJoining(Date dateOfJoining) - { - this.dateOfJoining = dateOfJoining; - } - - public Date getDateOfBirth() - { - return dateOfBirth; - } - - public void setDateOfBirth(Date dateOfBirth) - { - this.dateOfBirth = dateOfBirth; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java deleted file mode 100644 index d377b07..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.formatter; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.util.Date; -import java.util.List; - -import org.apache.commons.io.FileUtils; -import org.joda.time.DateTime; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.Description; - -import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; -import com.datatorrent.lib.util.TestUtils.TestInfo; -import com.google.common.collect.Lists; - -public class JsonFormatterTest -{ - JsonFormatter operator; - CollectorTestSink<Object> validDataSink; - CollectorTestSink<String> invalidDataSink; - - final ByteArrayOutputStream myOut = new ByteArrayOutputStream(); - - public JsonFormatterTest() - { - // So that the output is cleaner. - System.setErr(new PrintStream(myOut)); - } - - @Rule - public TestInfo testMeta = new FSTestWatcher() - { - private void deleteDirectory() - { - try { - FileUtils.deleteDirectory(new File(getDir())); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - @Override - protected void starting(Description descriptor) - { - super.starting(descriptor); - deleteDirectory(); - - operator = new JsonFormatter(); - - validDataSink = new CollectorTestSink<Object>(); - invalidDataSink = new CollectorTestSink<String>(); - TestUtils.setSink(operator.out, validDataSink); - TestUtils.setSink(operator.err, invalidDataSink); - operator.setup(null); - operator.activate(null); - - operator.beginWindow(0); - } - - @Override - protected void finished(Description description) - { - operator.endWindow(); - operator.teardown(); - - deleteDirectory(); - super.finished(description); - } - }; - - @Test - public void testJSONToPOJO() - { - Test1Pojo pojo = new Test1Pojo(); - pojo.a = 123; - pojo.b = 234876274; - pojo.c = "HowAreYou?"; - pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); - - operator.in.put(pojo); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":null}"; - Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJODate() - { - Test1Pojo pojo = new Test1Pojo(); - pojo.a = 123; - pojo.b = 234876274; - pojo.c = "HowAreYou?"; - pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); - pojo.date = new DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate(); - operator.setDateFormat("dd-MM-yyyy"); - operator.setup(null); - operator.activate(null); - operator.in.put(pojo); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; - Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJONullFields() - { - Test1Pojo pojo = new Test1Pojo(); - pojo.a = 123; - pojo.b = 234876274; - pojo.c = "HowAreYou?"; - pojo.d = null; - - operator.in.put(pojo); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":null,\"date\":null}"; - Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJOEmptyPOJO() - { - Test1Pojo pojo = new Test1Pojo(); - operator.in.put(pojo); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expectedJSONString = "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}"; - System.out.println(validDataSink.collectedTuples.get(0)); - Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJONullPOJO() - { - operator.in.put(null); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expectedJSONString = "null"; - Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJONoFieldPOJO() - { - operator.endWindow(); - operator.teardown(); - operator.setClazz(Test2Pojo.class); - operator.setup(null); - operator.beginWindow(1); - - Test2Pojo o = new Test2Pojo(); - operator.in.put(o); - Assert.assertEquals(0, validDataSink.collectedTuples.size()); - Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); - Assert.assertEquals(o, invalidDataSink.collectedTuples.get(0)); - } - - public static class Test1Pojo - { - public int a; - public long b; - public String c; - public List<String> d; - public Date date; - - @Override - public String toString() - { - return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]"; - } - } - - public static class Test2Pojo - { - } - -}
