[
https://issues.apache.org/jira/browse/APEXMALHAR-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15310784#comment-15310784
]
ASF GitHub Bot commented on APEXMALHAR-2105:
--------------------------------------------
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65412319
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
@@ -58,215 +68,156 @@
public class CsvFormatter extends Formatter<String>
{
- private ArrayList<Field> fields;
- @NotNull
- protected String classname;
- @NotNull
- protected int fieldDelimiter;
- protected String lineDelimiter;
+ /**
+ * Names of all the fields in the same order that would appear in output
+ * records
+ */
+ private transient String[] nameMapping;
+ /**
+ * Cell processors are an integral part of reading and writing with
Super CSV
+ * they automate the data type conversions, and enforce constraints.
+ */
+ private transient CellProcessor[] processors;
+ /**
+ * Writing preferences that are passed through schema
+ */
+ private transient CsvPreference preference;
+ /**
+ * Contents of the schema.Schema is specified in a json format as per
+ * {@link DelimitedSchema}
+ */
@NotNull
- protected String fieldInfo;
+ private String schema;
+ /**
+ * Schema is read into this object to access fields
+ */
+ private transient DelimitedSchema delimitedParserSchema;
- public enum FIELD_TYPE
- {
- BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
- };
+ /**
+ * metric to keep count of number of tuples emitted on error port port
+ */
+ @AutoMetric
+ protected long errorTupleCount;
+
+ /**
+ * metric to keep count of number of tuples emitted on out port
+ */
+ @AutoMetric
+ protected long emittedObjectCount;
- protected transient String[] nameMapping;
- protected transient CellProcessor[] processors;
- protected transient CsvPreference preference;
+ /**
+ * metric to keep count of number of tuples emitted on input port
+ */
+ @AutoMetric
+ protected long incomingTuplesCount;
public CsvFormatter()
{
- fields = new ArrayList<Field>();
- fieldDelimiter = ',';
- lineDelimiter = "\r\n";
+ }
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ errorTupleCount = 0;
+ emittedObjectCount = 0;
+ incomingTuplesCount = 0;
}
@Override
public void setup(Context.OperatorContext context)
{
super.setup(context);
-
- //fieldInfo information
- fields = new ArrayList<Field>();
- String[] fieldInfoTuple = fieldInfo.split(",");
- for (int i = 0; i < fieldInfoTuple.length; i++) {
- String[] fieldTuple = fieldInfoTuple[i].split(":");
- Field field = new Field();
- field.setName(fieldTuple[0]);
- String[] typeFormat = fieldTuple[1].split("\\|");
- field.setType(typeFormat[0].toUpperCase());
- if (typeFormat.length > 1) {
- field.setFormat(typeFormat[1]);
- }
- getFields().add(field);
- }
- preference = new CsvPreference.Builder('"', fieldDelimiter,
lineDelimiter).build();
- int countKeyValue = getFields().size();
- nameMapping = new String[countKeyValue];
- processors = new CellProcessor[countKeyValue];
- initialise(nameMapping, processors);
-
+ delimitedParserSchema = new DelimitedSchema(schema);
+ preference = new
CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
+ delimitedParserSchema.getDelimiterChar(),
delimitedParserSchema.getLineDelimiter()).build();
+ nameMapping = delimitedParserSchema.getFieldNames()
+ .toArray(new String[delimitedParserSchema.getFieldNames().size()]);
+ processors = getProcessor(delimitedParserSchema.getFields());
}
- private void initialise(String[] nameMapping, CellProcessor[] processors)
+ /**
+ * Returns array of cellprocessors, one for each field
+ */
+ private CellProcessor[] getProcessor(List<Field> fields)
{
- for (int i = 0; i < getFields().size(); i++) {
- FIELD_TYPE type = getFields().get(i).type;
- nameMapping[i] = getFields().get(i).name;
- if (type == FIELD_TYPE.DATE) {
- String dateFormat = getFields().get(i).format;
- processors[i] = new Optional(new FmtDate(dateFormat == null ?
"dd/MM/yyyy" : dateFormat));
+ CellProcessor[] processor = new CellProcessor[fields.size()];
+ int fieldCount = 0;
+ for (Field field : fields) {
+ if (field.getType() == FieldType.DATE) {
+ String format =
field.getConstraints().get(DelimitedSchema.DATE_FORMAT) == null ? null
+ :
(String)field.getConstraints().get(DelimitedSchema.DATE_FORMAT);
+ processor[fieldCount++] = new Optional(new FmtDate(format == null
? "dd/MM/yyyy" : format));
} else {
- processors[i] = new Optional();
+ processor[fieldCount++] = new Optional();
}
}
-
+ return processor;
}
@Override
public String convert(Object tuple)
{
+ incomingTuplesCount++;
+ if (tuple == null) {
+ errorTupleCount++;
+ logger.error(" Null tuple", tuple);
+ return null;
+ }
try {
StringWriter stringWriter = new StringWriter();
ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter,
preference);
beanWriter.write(tuple, nameMapping, processors);
beanWriter.flush();
beanWriter.close();
+ emittedObjectCount++;
return stringWriter.toString();
} catch (SuperCsvException e) {
- logger.debug("Error while converting tuple {}
{}",tuple,e.getMessage());
+ logger.error("Error while converting tuple {} {}", tuple,
e.getMessage());
+ errorTupleCount++;
} catch (IOException e) {
DTThrowable.rethrow(e);
}
return null;
}
- public static class Field
- {
- String name;
- String format;
- FIELD_TYPE type;
-
- public String getName()
- {
- return name;
- }
-
- public void setName(String name)
- {
- this.name = name;
- }
-
- public FIELD_TYPE getType()
- {
- return type;
- }
-
- public void setType(String type)
- {
- this.type = FIELD_TYPE.valueOf(type);
- }
-
- public String getFormat()
- {
- return format;
- }
-
- public void setFormat(String format)
- {
- this.format = format;
- }
- }
-
/**
- * Gets the array list of the fields, a field being a POJO containing
the name
- * of the field and type of field.
+ * Get the schema
*
- * @return An array list of Fields.
+ * @return schema
*/
- public ArrayList<Field> getFields()
+ public String getSchema()
{
- return fields;
+ return schema;
}
/**
- * Sets the array list of the fields, a field being a POJO containing
the name
- * of the field and type of field.
+ * Set the schema
*
- * @param fields
- * An array list of Fields.
+ * @param schema
*/
- public void setFields(ArrayList<Field> fields)
+ public void setSchema(String schema)
{
- this.fields = fields;
+ this.schema = schema;
}
- /**
- * Gets the delimiter which separates fields in incoming data.
- *
- * @return fieldDelimiter
- */
- public int getFieldDelimiter()
+ @VisibleForTesting
+ public long getErrorTupleCount()
{
- return fieldDelimiter;
+ return errorTupleCount;
}
- /**
- * Sets the delimiter which separates fields in incoming data.
- *
- * @param fieldDelimiter
- */
- public void setFieldDelimiter(int fieldDelimiter)
+ @VisibleForTesting
+ public long getEmittedObjectCount()
--- End diff --
You can make this protected? As the testcase will be in the same package,
it'll be accesible via protected as well.
> Enhance CSV Formatter to take in schema similar to Csv Parser
> -------------------------------------------------------------
>
> Key: APEXMALHAR-2105
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2105
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: shubham pathak
>
> CSV Parser takes in a schema that specifies field names and constraints. CSV
> Formatter also needs same information, but in the current implementation , it
> takes it as "fieldInfo". Enhancing CSV Formatter to support the same schema
> as CSV Parser would make it simpler for the end user.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)