Repository: apex-malhar Updated Branches: refs/heads/master 4ab457f18 -> 0885bfad2
APEXMALHAR-2183 Csv formatter documetation Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0885bfad Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0885bfad Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0885bfad Branch: refs/heads/master Commit: 0885bfad2d1f35287b8a319948cb897f371b9a2e Parents: 4ab457f Author: venkateshDT <[email protected]> Authored: Tue Jan 10 00:43:44 2017 -0800 Committer: venkateshDT <[email protected]> Committed: Mon Jan 16 01:24:53 2017 -0800 ---------------------------------------------------------------------- docs/operators/csvformatter.md | 115 ++++++++++++++++++++++++++++++++++++ mkdocs.yml | 2 +- 2 files changed, 116 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0885bfad/docs/operators/csvformatter.md ---------------------------------------------------------------------- diff --git a/docs/operators/csvformatter.md b/docs/operators/csvformatter.md new file mode 100644 index 0000000..6b35d8c --- /dev/null +++ b/docs/operators/csvformatter.md @@ -0,0 +1,115 @@ +CsvFormatter +============ + +## Operator Objective +This operator receives a POJO ([Plain Old Java Object](https://en.wikipedia.org/wiki/Plain_Old_Java_Object)) as an incoming tuple, converts the data in +the incoming POJO to a custom delimited string and emits the delimited string. + +CsvFormatter supports schema definition as a JSON string. + +CsvFormatter does not hold any state and is **idempotent**, **fault-tolerant** and **statically/dynamically partitionable**. + +## Operator Information +1. Operator location: ***malhar-contrib*** +2. Available since: ***3.2.0*** +3. Operator state: ***Evolving*** +3. Java Packages: + * Operator: ***[com.datatorrent.contrib.formatter.CsvFormatter](https://www.datatorrent.com/docs/apidocs/com/datatorrent/contrib/formatter/CsvFormatter.html)*** + +## Properties, Attributes and Ports +### <a name="props"></a>Properties of POJOEnricher +| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** | +| -------- | ----------- | ---- | ------------------ | ------------- | +| *schema* | Contents of the schema.Schema is specified in a json format. | String | Yes | N/A | + + +### Platform Attributes that influences operator behavior +| **Attribute** | **Description** | **Type** | **Mandatory** | +| -------- | ----------- | ---- | ------------------ | +| *in.TUPLE_CLASS* | TUPLE_CLASS attribute on input port which tells operator the class of POJO which will be incoming | Class or FQCN| Yes | + + +### Ports +| **Port** | **Description** | **Type** | **Mandatory** | +| -------- | ----------- | ---- | ------------------ | +| *in* | Tuples which need to be formatted are received on this port | Object (POJO) | Yes | +| *out* | Tuples that are formatted are emitted from this port | String | No | +| *err* | Tuples that could not be converted are emitted on this port | Object | No | + +## Limitations +Current CsvFormatter contain following limitations: + +1. The field names in schema and the pojo field names should match.For eg. if name of the schema field is "customerName", then POJO should contain a field with the same name. +2. Field wise validation/formatting is not yet supported. +3. The fields will be written to the file in the same order as specified in schema.json + +## Example +Example for CsvFormatter can be found at: [https://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter](https://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter) + +## Advanced + +### <a name="JSONFileFormat"></a> Schema format for CsvFormatter +CsvFormatter expects schema to be a String in JSON format: + + +Example for format of schema: +```json +{ + "separator": ",", + "quoteChar": "\"", + "lineDelimiter": "\n", + "fields": [ + { + "name": "campaignId", + "type": "Integer" + }, + { + "name": "startDate", + "type": "Date", + "constraints": { + "format": "yyyy-MM-dd" + } + } + ] +} +``` + + +### Partitioning of CsvFormatter +Being stateless operator, CsvFormatter will ensure built-in partitioners present in Malhar library can be directly used by setting properties as follows: + +#### Stateless partioning of CsvFormatter +Stateless partitioning will ensure that CsvFormatter will be partitioned right at the start of the application and will remain partitioned throughout the lifetime of the DAG. +CsvFormatter can be stateless partitioned by adding following lines to properties.xml: + +```xml + <property> + <name>dt.operator.{OperatorName}.attr.PARTITIONER</name> + <value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value> + </property> +``` + +where {OperatorName} is the name of the CsvFormatter operator. +Above lines will partition CsvFormatter statically 2 times. Above value can be changed accordingly to change the number of static partitions. + + +#### Dynamic Partitioning of CsvFormatter +Dynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain conditions. +CsvFormatter can be dynamically partitioned using below out-of-the-box partitioner: + +##### Throughput based +Following code can be added to populateDAG method of application to dynamically partition CsvFormatter: +```java + StatelessThroughputBasedPartitioner<CsvFormatter> partitioner = new StatelessThroughputBasedPartitioner<>(); + partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000)); + partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000)); + partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000)); + dag.setAttribute(csvFormatter, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner})); + dag.setAttribute(csvFormatter, OperatorContext.PARTITIONER, partitioner); +``` + +Above code will dynamically partition CsvFormatter when throughput changes. +If overall throughput of CsvFormatter goes beyond 30000 or less than 10000, the platform will repartition CsvFormatter +to balance throughput of a single partition to be between 10000 and 30000. +CooldownMillis of 10000 will be used as threshold time for which throughput change is observed. + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0885bfad/mkdocs.yml ---------------------------------------------------------------------- diff --git a/mkdocs.yml b/mkdocs.yml index d19cb7c..6a17cc5 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -19,4 +19,4 @@ pages: - Json Formatter: operators/jsonFormatter.md - Transform Operator: operators/transform.md - Xml Parser: operators/xmlParserOperator.md - + - Csv Formatter: operators/csvformatter.md
