Repository: apex-malhar Updated Branches: refs/heads/master 18588f3a5 -> 3df6715d7
APEXMALHAR-2105 enhancing CSV formatter to read field info from schema Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3df6715d Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3df6715d Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3df6715d Branch: refs/heads/master Commit: 3df6715d7aec3094b7794ae7becd2e12dbeda4b3 Parents: 18588f3 Author: shubham <[email protected]> Authored: Wed Jun 1 18:47:36 2016 +0530 Committer: Chinmay Kolhatkar <[email protected]> Committed: Mon Jun 6 16:36:07 2016 -0700 ---------------------------------------------------------------------- .../contrib/formatter/CsvFormatter.java | 263 ++++++++----------- .../contrib/formatter/CsvFormatterTest.java | 164 ++++++------ 2 files changed, 195 insertions(+), 232 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3df6715d/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java index 4b4bae3..34ba49c 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java +++ b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java @@ -20,7 +20,7 @@ package com.datatorrent.contrib.formatter; import java.io.IOException; import java.io.StringWriter; -import java.util.ArrayList; +import java.util.List; import javax.validation.constraints.NotNull; @@ -34,7 +34,13 @@ import org.supercsv.io.CsvBeanWriter; import org.supercsv.io.ICsvBeanWriter; import org.supercsv.prefs.CsvPreference; +import com.google.common.annotations.VisibleForTesting; + +import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; +import com.datatorrent.contrib.parser.DelimitedSchema; +import com.datatorrent.contrib.parser.DelimitedSchema.Field; +import com.datatorrent.contrib.parser.DelimitedSchema.FieldType; import com.datatorrent.lib.formatter.Formatter; import com.datatorrent.netlet.util.DTThrowable; @@ -44,10 +50,14 @@ import com.datatorrent.netlet.util.DTThrowable; * java type.<br> * <br> * <b>Properties</b> <br> - * <b>fieldInfo</b>:User need to specify fields and their types as a comma - * separated string having format <NAME>:<TYPE>|<FORMAT> in - * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as - * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy + * <b>schema</b>:schema as a string specified in a json format as per + * {@link DelimitedSchema}. Currently only Date constraint (fmt) is supported + * <br> + * <b>clazz</b>:Pojo class <br> + * <b>Ports</b> <br> + * <b>in</b>:input tuple as a POJO. Each tuple represents a record<br> + * <b>out</b>:tuples are are converted to string are emitted on this port<br> + * <b>err</b>:tuples that could not be converted are emitted on this port<br> * * @displayName CsvFormatter * @category Formatter @@ -58,215 +68,152 @@ import com.datatorrent.netlet.util.DTThrowable; public class CsvFormatter extends Formatter<String> { - private ArrayList<Field> fields; - @NotNull - protected String classname; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; + /** + * Names of all the fields in the same order that would appear in output + * records + */ + private transient String[] nameMapping; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Writing preferences that are passed through schema + */ + private transient CsvPreference preference; + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link DelimitedSchema} + */ @NotNull - protected String fieldInfo; + private String schema; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; - public enum FIELD_TYPE - { - BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; + /** + * metric to keep count of number of tuples emitted on error port port + */ + @AutoMetric + private long errorTupleCount; - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - protected transient CsvPreference preference; + /** + * metric to keep count of number of tuples emitted on out port + */ + @AutoMetric + private long emittedObjectCount; - public CsvFormatter() - { - fields = new ArrayList<Field>(); - fieldDelimiter = ','; - lineDelimiter = "\r\n"; + /** + * metric to keep count of number of tuples emitted on input port + */ + @AutoMetric + private long incomingTuplesCount; + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + errorTupleCount = 0; + emittedObjectCount = 0; + incomingTuplesCount = 0; } @Override public void setup(Context.OperatorContext context) { super.setup(context); - - //fieldInfo information - fields = new ArrayList<Field>(); - String[] fieldInfoTuple = fieldInfo.split(","); - for (int i = 0; i < fieldInfoTuple.length; i++) { - String[] fieldTuple = fieldInfoTuple[i].split(":"); - Field field = new Field(); - field.setName(fieldTuple[0]); - String[] typeFormat = fieldTuple[1].split("\\|"); - field.setType(typeFormat[0].toUpperCase()); - if (typeFormat.length > 1) { - field.setFormat(typeFormat[1]); - } - getFields().add(field); - } - preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); - int countKeyValue = getFields().size(); - nameMapping = new String[countKeyValue]; - processors = new CellProcessor[countKeyValue]; - initialise(nameMapping, processors); - + delimitedParserSchema = new DelimitedSchema(schema); + preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(), + delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build(); + nameMapping = delimitedParserSchema.getFieldNames() + .toArray(new String[delimitedParserSchema.getFieldNames().size()]); + processors = getProcessor(delimitedParserSchema.getFields()); } - private void initialise(String[] nameMapping, CellProcessor[] processors) + /** + * Returns array of cellprocessors, one for each field + */ + private CellProcessor[] getProcessor(List<Field> fields) { - for (int i = 0; i < getFields().size(); i++) { - FIELD_TYPE type = getFields().get(i).type; - nameMapping[i] = getFields().get(i).name; - if (type == FIELD_TYPE.DATE) { - String dateFormat = getFields().get(i).format; - processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat)); + CellProcessor[] processor = new CellProcessor[fields.size()]; + int fieldCount = 0; + for (Field field : fields) { + if (field.getType() == FieldType.DATE) { + String format = field.getConstraints().get(DelimitedSchema.DATE_FORMAT) == null ? null + : (String)field.getConstraints().get(DelimitedSchema.DATE_FORMAT); + processor[fieldCount++] = new Optional(new FmtDate(format == null ? "dd/MM/yyyy" : format)); } else { - processors[i] = new Optional(); + processor[fieldCount++] = new Optional(); } } - + return processor; } @Override public String convert(Object tuple) { + incomingTuplesCount++; + if (tuple == null) { + errorTupleCount++; + logger.error(" Null tuple", tuple); + return null; + } try { StringWriter stringWriter = new StringWriter(); ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference); beanWriter.write(tuple, nameMapping, processors); beanWriter.flush(); beanWriter.close(); + emittedObjectCount++; return stringWriter.toString(); } catch (SuperCsvException e) { - logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); + logger.error("Error while converting tuple {} {}", tuple, e.getMessage()); + errorTupleCount++; } catch (IOException e) { DTThrowable.rethrow(e); } return null; } - public static class Field - { - String name; - String format; - FIELD_TYPE type; - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public FIELD_TYPE getType() - { - return type; - } - - public void setType(String type) - { - this.type = FIELD_TYPE.valueOf(type); - } - - public String getFormat() - { - return format; - } - - public void setFormat(String format) - { - this.format = format; - } - } - /** - * Gets the array list of the fields, a field being a POJO containing the name - * of the field and type of field. + * Get the schema * - * @return An array list of Fields. + * @return schema */ - public ArrayList<Field> getFields() + public String getSchema() { - return fields; + return schema; } /** - * Sets the array list of the fields, a field being a POJO containing the name - * of the field and type of field. + * Set the schema * - * @param fields - * An array list of Fields. + * @param schema */ - public void setFields(ArrayList<Field> fields) + public void setSchema(String schema) { - this.fields = fields; + this.schema = schema; } - /** - * Gets the delimiter which separates fields in incoming data. - * - * @return fieldDelimiter - */ - public int getFieldDelimiter() + @VisibleForTesting + protected long getErrorTupleCount() { - return fieldDelimiter; + return errorTupleCount; } - /** - * Sets the delimiter which separates fields in incoming data. - * - * @param fieldDelimiter - */ - public void setFieldDelimiter(int fieldDelimiter) + @VisibleForTesting + protected long getEmittedObjectCount() { - this.fieldDelimiter = fieldDelimiter; + return emittedObjectCount; } - /** - * Gets the delimiter which separates lines in incoming data. - * - * @return lineDelimiter - */ - public String getLineDelimiter() - { - return lineDelimiter; - } - - /** - * Sets the delimiter which separates line in incoming data. - * - * @param lineDelimiter - */ - public void setLineDelimiter(String lineDelimiter) - { - this.lineDelimiter = lineDelimiter; - } - - /** - * Gets the name of the fields with type and format in data as comma separated - * string in same order as incoming data. e.g - * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy - * - * @return fieldInfo - */ - public String getFieldInfo() - { - return fieldInfo; - } - - /** - * Sets the name of the fields with type and format in data as comma separated - * string in same order as incoming data. e.g - * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy - * - * @param fieldInfo - */ - public void setFieldInfo(String fieldInfo) + @VisibleForTesting + protected long getIncomingTuplesCount() { - this.fieldInfo = fieldInfo; + return incomingTuplesCount; } private static final Logger logger = LoggerFactory.getLogger(CsvFormatter.class); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3df6715d/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java index 13d9739..4a1c770 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java @@ -18,7 +18,9 @@ */ package com.datatorrent.contrib.formatter; -import java.util.Date; +import java.io.IOException; + +import javax.validation.ConstraintViolationException; import org.joda.time.DateTime; import org.junit.Assert; @@ -27,13 +29,25 @@ import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; -import com.datatorrent.contrib.formatter.CsvFormatter; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.contrib.parser.CsvPOJOParserTest.Ad; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.TestUtils; public class CsvFormatterTest { + private static final String filename = "schema.json"; CsvFormatter operator; CollectorTestSink<Object> validDataSink; CollectorTestSink<String> invalidDataSink; @@ -49,8 +63,8 @@ public class CsvFormatterTest { super.starting(description); operator = new CsvFormatter(); - operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); - operator.setLineDelimiter("\r\n"); + operator.setClazz(Ad.class); + operator.setSchema(SchemaUtils.jarResourceFileToString(filename)); validDataSink = new CollectorTestSink<Object>(); invalidDataSink = new CollectorTestSink<String>(); TestUtils.setSink(operator.out, validDataSink); @@ -70,95 +84,97 @@ public class CsvFormatterTest public void testPojoReaderToCsv() { operator.setup(null); - EmployeeBean emp = new EmployeeBean(); - emp.setName("john"); - emp.setDept("cs"); - emp.setEid(1); - emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); - operator.in.process(emp); + Ad ad = new Ad(); + ad.setCampaignId(9823); + ad.setAdId(1234); + ad.setAdName("ad"); + ad.setBidPrice(1.2); + ad.setStartDate( + new DateTime().withDate(2015, 1, 1).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).toDate()); + ad.setEndDate(new DateTime().withDate(2016, 1, 1).toDate()); + ad.setSecurityCode(12345678); + ad.setParentCampaign("CAMP_AD"); + ad.setActive(true); + ad.setWeatherTargeted('y'); + ad.setValid("valid"); + operator.in.process(ad); Assert.assertEquals(1, validDataSink.collectedTuples.size()); Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); String csvOp = (String)validDataSink.collectedTuples.get(0); Assert.assertNotNull(csvOp); - Assert.assertEquals("john,cs,1,01/01/2015" + operator.getLineDelimiter(), csvOp); + Assert.assertEquals("1234,9823,ad,1.2,2015-01-01 00:00:00,01/01/2016,12345678,true,false,CAMP_AD,y,valid\r\n", + csvOp); + Assert.assertEquals(1, operator.getIncomingTuplesCount()); + Assert.assertEquals(0, operator.getErrorTupleCount()); + Assert.assertEquals(1, operator.getEmittedObjectCount()); } @Test - public void testPojoReaderToCsvMultipleDate() + public void testPojoReaderToCsvNullInput() { - operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date,dateOfBirth:date|dd-MMM-yyyy"); operator.setup(null); - EmployeeBean emp = new EmployeeBean(); - emp.setName("john"); - emp.setDept("cs"); - emp.setEid(1); - emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); - emp.setDateOfBirth(new DateTime().withDate(2015, 1, 1).toDate()); - operator.in.process(emp); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String csvOp = (String)validDataSink.collectedTuples.get(0); - Assert.assertNotNull(csvOp); - Assert.assertEquals("john,cs,1,01/01/2015,01-Jan-2015" + operator.getLineDelimiter(), csvOp); + operator.in.process(null); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(1, operator.getIncomingTuplesCount()); + Assert.assertEquals(1, operator.getErrorTupleCount()); + Assert.assertEquals(0, operator.getEmittedObjectCount()); + } - public static class EmployeeBean + @Test + public void testApplication() throws IOException, Exception { - - private String name; - private String dept; - private int eid; - private Date dateOfJoining; - private Date dateOfBirth; - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public String getDept() - { - return dept; - } - - public void setDept(String dept) - { - this.dept = dept; - } - - public int getEid() - { - return eid; - } - - public void setEid(int eid) - { - this.eid = eid; - } - - public Date getDateOfJoining() - { - return dateOfJoining; + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new CsvParserApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(5000);// runs for 5 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); } + } - public void setDateOfJoining(Date dateOfJoining) + public static class CsvParserApplication implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) { - this.dateOfJoining = dateOfJoining; + PojoEmitter input = dag.addOperator("data", new PojoEmitter()); + CsvFormatter formatter = dag.addOperator("formatter", new CsvFormatter()); + dag.getMeta(formatter).getMeta(formatter.in).getAttributes().put(Context.PortContext.TUPLE_CLASS, Ad.class); + formatter.setSchema(SchemaUtils.jarResourceFileToString("schema.json")); + ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator()); + ConsoleOutputOperator error = dag.addOperator("error", new ConsoleOutputOperator()); + output.setDebug(true); + dag.addStream("input", input.output, formatter.in); + dag.addStream("output", formatter.out, output.input); + dag.addStream("err", formatter.err, error.input); } + } - public Date getDateOfBirth() - { - return dateOfBirth; - } + public static class PojoEmitter extends BaseOperator implements InputOperator + { + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>(); - public void setDateOfBirth(Date dateOfBirth) + @Override + public void emitTuples() { - this.dateOfBirth = dateOfBirth; + Ad ad = new Ad(); + ad.setCampaignId(9823); + ad.setAdId(1234); + ad.setAdName("ad"); + ad.setBidPrice(1.2); + ad.setStartDate( + new DateTime().withDate(2015, 1, 1).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).toDate()); + ad.setEndDate(new DateTime().withDate(2016, 1, 1).toDate()); + ad.setSecurityCode(12345678); + ad.setParentCampaign("CAMP_AD"); + ad.setActive(true); + ad.setWeatherTargeted('y'); + ad.setValid("valid"); + output.emit(ad); } }
