Repository: apex-malhar Updated Branches: refs/heads/master 2e2dfc5c0 -> 0ec1433b2
APEXMALHAR-2153 Adding user docs for POJOEnricher. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0ec1433b Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0ec1433b Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0ec1433b Branch: refs/heads/master Commit: 0ec1433b25c0df2db1c6e21f0ce2b88d1251deaf Parents: 2e2dfc5 Author: Chinmay <[email protected]> Authored: Tue Jul 26 17:28:36 2016 +0530 Committer: Chinmay <[email protected]> Committed: Mon Aug 1 14:09:43 2016 +0530 ---------------------------------------------------------------------- docs/operators/enricher.md | 169 ++++++++++++++++++++++++++++++++++++++++ mkdocs.yml | 1 + 2 files changed, 170 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0ec1433b/docs/operators/enricher.md ---------------------------------------------------------------------- diff --git a/docs/operators/enricher.md b/docs/operators/enricher.md new file mode 100644 index 0000000..ad0d035 --- /dev/null +++ b/docs/operators/enricher.md @@ -0,0 +1,169 @@ +POJO Enricher +============= + +## Operator Objective +This operator receives an POJO ([Plain Old Java Object](https://en.wikipedia.org/wiki/Plain_Old_Java_Object)) as an incoming tuple and uses an external source to enrich the data in +the incoming tuple and finally emits the enriched data as a new enriched POJO. + +POJOEnricher supports enrichment from following external sources: + +1. **JSON File Based** - Reads the file in memory having content stored in JSON format and use that to enrich the data. This can be done using FSLoader implementation. +2. **JDBC Based** - Any JDBC store can act as an external entity to which enricher can request data for enriching incoming tuples. This can be done using JDBCLoader implementation. + +POJO Enricher does not hold any state and is **idempotent**, **fault-tolerant** and **statically/dynamically partitionable**. + +## Operator Usecase +1. Bank ***transaction records*** usually contains customerId. For further analysis of transaction one wants the customer name and other customer related information. +Such information is present in another database. One could enrich the transaction's record with customer information using POJOEnricher. +2. ***Call Data Record (CDR)*** contains only mobile/telephone numbers of the customer. Customer information is missing in CDR. POJO Enricher can be used to enrich +CDR with customer data for further analysis. + +## Operator Information +1. Operator location: ***malhar-contrib*** +2. Available since: ***3.4.0*** +3. Operator state: ***Evolving*** +3. Java Packages: + * Operator: ***[com.datatorrent.contrib.enrich.POJOEnricher](https://www.datatorrent.com/docs/apidocs/com/datatorrent/contrib/enrich/POJOEnricher.html)*** + * FSLoader: ***[com.datatorrent.contrib.enrich.FSLoader](https://www.datatorrent.com/docs/apidocs/com/datatorrent/contrib/enrich/FSLoader.html)*** + * JDBCLoader: ***[com.datatorrent.contrib.enrich.JDBCLoader](https://www.datatorrent.com/docs/apidocs/com/datatorrent/contrib/enrich/JDBCLoader.html)*** + +## Properties, Attributes and Ports +### <a name="props"></a>Properties of POJOEnricher +| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** | +| -------- | ----------- | ---- | ------------------ | ------------- | +| *includeFields* | List of fields from database that needs to be added to output POJO. | List<String\> | Yes | N/A | +| *lookupFields* | List of fields from input POJO which will form a *unique composite* key for querying to store | List<String\> | Yes | N/A | +| *store* | Backend Store from which data should be queried for enrichment | [BackendStore](#backendStore) | Yes | N/A | +| *cacheExpirationInterval* | Cache entry expiry in ms. After this time, the lookup to store will be done again for given key | int | No | 1 * 60 * 60 * 1000 (1 hour) | +| *cacheCleanupInterval* | Interval in ms after which cache will be removed for any stale entries. | int | No | 1 * 60 * 60 * 1000 (1 hour) | +| *cacheSize* | Number of entry in cache after which eviction will start on each addition based on LRU | int | No | 1000 | + +#### <a name="backendStore"></a>Properties of FSLoader (BackendStore) +| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** | +| -------- | ----------- | ---- | ------------------ | ------------- | +| *fileName* | Path of the file, the data from which will be used for enrichment. See [here](#JSONFileFormat) for JSON File format. | String | Yes | N/A | + + +#### Properties of JDBCLoader (BackendStore) +| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** | +| -------- | ----------- | ---- | ------------------ | ------------- | +| *databaseUrl* | Connection string for connecting to JDBC | String | Yes | N/A | +| *databaseDriver* | JDBC Driver class for connection to JDBC Store. This driver should be there in classpath | String | Yes | N/A | +| *tableName* | Name of the table from which data needs to be retrieved | String | Yes | N/A | +| *connectionProperties* | Command seperated list of advanced connection properties that need to be passed to JDBC Driver. For eg. *prop1:val1,prop2:val2* | String | No | null | +| *queryStmt* | Select statement which will be used to query the data. This is optional parameter in case of advanced query. | String | No | null | + + + +### Platform Attributes that influences operator behavior +| **Attribute** | **Description** | **Type** | **Mandatory** | +| -------- | ----------- | ---- | ------------------ | +| *input.TUPLE_CLASS* | TUPLE_CLASS attribute on input port which tells operator the class of POJO which will be incoming | Class or FQCN| Yes | +| *output.TUPLE_CLASS* | TUPLE_CLASS attribute on output port which tells operator the class of POJO which need to be emitted | Class or FQCN | Yes | + + +### Ports +| **Port** | **Description** | **Type** | **Mandatory** | +| -------- | ----------- | ---- | ------------------ | +| *input* | Tuple which needs to be enriched are received on this port | Object (POJO) | Yes | +| *output* | Tuples that are enriched from external source are emitted from on this port | Object (POJO) | No | + +## Limitations +Current POJOEnricher contains following limitation: + +1. FSLoader loads the file content in memory. Though it loads only the composite key and composite value in memory, a very large amount of data would bloat the memory and make the operator go OOM. In case the filesize is large, allocate sufficient memory to the POJOEnricher. +2. Incoming POJO should be a subset of outgoing POJO. +3. [includeFields](#props) property should contains fields having same name in database column as well as outgoing POJO. For eg. If name of the database column is "customerName", then outgoing POJO should contains a field with the same name and same should be added to includeFields. +4. [lookupFields](#props) property should contains fields having same name in database column as well as incoming POJO. For eg. If name of the database column is "customerId", then incoming POJO should contains a field with the same name and same should be added to lookupFields. + +## Example +Example for POJOEnricher can be found at: [https://github.com/DataTorrent/examples/tree/master/tutorials/enricher](https://github.com/DataTorrent/examples/tree/master/tutorials/enricher) + +## Advanced + +### <a name="JSONFileFormat"></a> File format for JSON based FSLoader +FSLoader expects file to be in specific format: + +1. Each line makes on record which becomes part of the store +2. Each line is a valid JSON Object where *key* is name of the field name and *value* is the field value. + +Example for the format look like following: +```json +{"circleId":0, "circleName":"A"} +{"circleId":1, "circleName":"B"} +{"circleId":2, "circleName":"C"} +{"circleId":3, "circleName":"D"} +{"circleId":4, "circleName":"E"} +{"circleId":5, "circleName":"F"} +{"circleId":6, "circleName":"G"} +{"circleId":7, "circleName":"H"} +{"circleId":8, "circleName":"I"} +{"circleId":9, "circleName":"J"} +``` + +### Caching mechanism in POJOEnricher +POJOEnricher contains an cache which makes the lookup for keys more efficient. This is specially useful when data in external store is not changing much. +However, one should carefully tune the [cacheExpirationInterval](#props) property for desirable results. + +On every incoming tuple, POJOEnricher first queries the cache. If the cache contains desired record and is within expiration interval, then it uses that to +enrich the tuple, otherwise does a lookup to configured store and the return value is used to enrich the tuple. The return value is then cached for composite key and composite value. + +POJOEnricher only caches the required fields for enrichment mechanism and not all fields returned by external store. This ensures optimal use of memory. + + +### Partitioning of POJOEnricher +Being stateless operator, POJOEnricher will ensure built-in partitioners present in Malhar library can be directly simply by setting few properties as follows: + +#### Stateless partioning of POJOEnricher +Stateless partitioning will ensure that POJOEnricher will will be partitioned right at the starting of the application and will remain partitioned throughout the lifetime of the DAG. +POJOEnricher can be stateless partitioned by adding following lines to properties.xml: + +```xml + <property> + <name>dt.operator.{OperatorName}.attr.PARTITIONER</name> + <value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value> + </property> +``` + +where {OperatorName} is the name of the POJOEnricher operator. +Above lines will partition POJOEnricher statically 2 times. Above value can be changed accordingly to change the number of static partitions. + + +#### Dynamic Partitioning of POJOEnricher +Dynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain condition. +POJOEnricher can be dynamically partitioned using 2 out-of-the-box partitioners: + +##### Throughput based +Following code can be added to populateDAG method of application to dynamically partitioning POJOEnricher: +```java + StatelessThroughputBasedPartitioner<POJOEnricher> partitioner = new StatelessThroughputBasedPartitioner<>(); + partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000)); + partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000)); + partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000)); + dag.setAttribute(pojoEnricherObj, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner})); + dag.setAttribute(pojoEnricherObj, OperatorContext.PARTITIONER, partitioner); +``` + +Above code will dynamically partition POJOEnricher when the throughput changes. +If the overall throughput of POJOEnricher goes beyond 30000 or less than 10000, the platform will repartition POJOEnricher +to balance throughput of a single partition to be between 10000 and 30000. +CooldownMillis of 10000 will be used as the threshold time for which the throughout change is observed. + +##### Latency based +Following code can be added to populateDAG method of application to dynamically partitioning POJOEnricher: +```java + StatelessLatencyBasedPartitioner<POJOEnricher> partitioner = new StatelessLatencyBasedPartitioner<>(); + partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000)); + partitioner.setMaximumLatency(conf.getLong(MAX_THROUGHPUT, 10)); + partitioner.setMinimumLatency(conf.getLong(MIN_THROUGHPUT, 3)); + dag.setAttribute(pojoEnricherObj, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner})); + dag.setAttribute(pojoEnricherObj, OperatorContext.PARTITIONER, partitioner); +``` + +Above code will dynamically partition POJOEnricher when the overall latency of POJOEnricher changes. +If the overall latency of POJOEnricher goes beyond 10 ms or less than 3 ms, the platform will repartition POJOEnricher +to balance latency of a single partition to be between 3 ms and 10 ms. +CooldownMillis of 10000 will be used as the threshold time for which the latency change is observed. + + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0ec1433b/mkdocs.yml ---------------------------------------------------------------------- diff --git a/mkdocs.yml b/mkdocs.yml index fdbc375..de28476 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -8,3 +8,4 @@ pages: - File Splitter: operators/file_splitter.md - Block Reader: operators/block_reader.md - File Output: operators/file_output.md + - Enricher: operators/enricher.md
