Repository: apex-malhar
Updated Branches:
  refs/heads/master e01cf9c44 -> 1a9c75be9


APEXMALHAR-2257 Added documentation for Transform Operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1a9c75be
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1a9c75be
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1a9c75be

Branch: refs/heads/master
Commit: 1a9c75be9bd3fe670fca8c31ff001a0fca931c05
Parents: e01cf9c
Author: chaitanya <[email protected]>
Authored: Fri Nov 25 16:29:41 2016 +0530
Committer: chaitanya <[email protected]>
Committed: Fri Nov 25 16:29:41 2016 +0530

----------------------------------------------------------------------
 docs/operators/transform.md | 179 +++++++++++++++++++++++++++++++++++++++
 mkdocs.yml                  |   1 +
 2 files changed, 180 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1a9c75be/docs/operators/transform.md
----------------------------------------------------------------------
diff --git a/docs/operators/transform.md b/docs/operators/transform.md
new file mode 100644
index 0000000..95cda9a
--- /dev/null
+++ b/docs/operators/transform.md
@@ -0,0 +1,179 @@
+Transform - Operator Documentation
+==================================
+
+### About Transform operator
+----------------------------
+
+Transform means mapping of field expression from input to output or conversion 
of fields from one type to another.
+This operator is stateless. This operator receives objects on its input port; 
for each such input object, it creates a new output object whose fields are 
computed as expressions involving fields of the input object. 
+The types of the input and output objects are configurable as are the 
expressions used to compute the output fields. 
+
+The operator class is `TransformOperator` located in the package 
`com.datatorrent.lib.transform`.
+Please refer to [github 
URL](https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java)
 for `TransformOperator`.
+
+
+### Use Case
+------------
+
+Consider the data that needs to be transformed as per output schema.
+
+Consider input objects with these fields:
+
+| Name        | Type           |
+|-------------|----------------|
+| FirstName   | String         |
+| LastName    | String         |
+| Phone       | String         |
+| DateOfBirth | java.util.Date |
+| Address     | String         |
+ 
+ and output objects with fields: 
+ 
+| Name    | Type    |
+|---------|---------|
+| Name    | String  |
+| Phone   | String  |
+| Age     | Integer |
+| Address | String  |
+ 
+Suppose `Name` is a concatenation of `FirstName` and `LastName` and 
+        `Age` is computed by subtracting the `DateOfBirth` from the current 
year.
+
+These simple computations can be expressed as Java expressions where the input 
object is
+represented by $ and provided as configuration parameters as follows:
+
+```
+Name => {$.FirstName}.concat(\" \").concat({$.LastName})
+Age => (new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()
+```
+
+### Configuration Parameters
+-----------------------------
+
+-   ***expressionMap*** -   Map<String, String>
+    -   Mandatory Parameter
+    -   Specifies the map between the output field (key) and the expression 
used to compute it (value) using fields of the input Java object.
+
+-   ***expressionFunctions*** -   List<String>
+    -   List of imported classes or methods should be made available to 
expression to use. It overrides the default list.
+    -   Default Value = {java.lang.Math.*, 
org.apache.commons.lang3.StringUtils.*, 
org.apache.commons.lang3.StringEscapeUtils.*, 
org.apache.commons.lang3.time.DurationFormatUtils.*, 
org.apache.commons.lang3.time.DateFormatUtils.*}
+
+        
+-   ***copyMatchingFields*** -   boolean
+    -   Specifies whether matching fields should be copied; here matching 
means the name and type of an input field is the same as the name and type of 
an output field. 
+        If the matching field appears in `expressionMap` then it ignores copy 
to output object.
+    -   Default Value = true.
+    
+### Configuration Example
+-------------------------
+
+Consider input object with fields:
+
+| Name      | Type                   |
+|-----------|------------------------|
+| FirstName | String                 |
+| LastName  | String                 |
+| StartDate | org.joda.time.DateTime |
+
+and output objects with fields:
+
+| Name       | Type    |
+|------------|---------|
+| Name       | String  |
+| isLeapYear | Boolean |
+
+Note: `org.joda.time.DateTime` class is not present in the default list. So, 
we need to add this library to `expressionFunctions` as below in populateDAG 
method:
+```java
+TransformOperator operator = dag.addOperator("transform", new 
TransformOperator());
+operator.setExpressionFunctions(Arrays.asList("org.joda.time.DateTime", 
org.apache.commons.lang3.StringUtils));
+Map<String,String> expressionMap = new HashMap<>();
+expressionMap.put(isLeapYear, {$.StartDate}.year().isLeap());
+expressionMap.put(Name, org.apache.commons.lang3.StringUtils.joinWith(\" \", 
{$.FirstName},{$.LastName});
+operator.setExpressionMap(expressionMap);
+```
+
+Above Properties also can be set in properties file as follows:
+
+```xml
+<property>
+  <name>dt.operator.transform.expressionFunctions[0]</name>
+  <value>org.joda.time.DateTime</value>
+</property>     
+<property>
+  <name>dt.operator.transform.expressionFunctions[1]</name>
+  <value>org.apache.commons.lang3.StringUtils</value>
+</property>
+<property>
+  <name>dt.operator.transform.expressionMap(isLeapYear)</name>
+  <value>{$.StartDate}.year().isLeap()</value>
+</property>
+<property>
+  <name>dt.operator.transform.expressionMap(Name)</name>
+  <value>org.apache.commons.lang3.StringUtils.joinWith(\" \", {$.FirstName}, 
{$.LastName})</value>
+</property>
+```
+        
+### Ports
+----------
+
+-   ***input*** -   Port for input tuples.
+    -   Mandatory input port
+    
+-   ***output***    -   Port for transformed output tuples.
+    -   Mandatory output port
+
+### Attributes
+---------------
+-   ***Input port Attribute - input.TUPLE\_CLASS*** - Fully qualified class 
name and class should be Kryo serializable.
+    -   Mandatory attribute
+    -   Type of input tuple.
+
+-   ***Output port Attribute - output.TUPLE\_CLASS*** - Fully qualified class 
name and class should be Kryo serializable.
+    -   Mandatory attribute
+    -   Type of output tuple.
+    
+### Application Example
+------------------------
+
+Please refer 
[Example](https://github.com/DataTorrent/examples/tree/master/tutorials/transform)
 for transform sample application.
+
+### Partitioning
+----------------
+Being stateless, this operator can be partitioned using any of the built-in 
partitioners present in the Malhar library by setting a few properties as 
follows:
+
+#### Stateless partitioning
+Stateless partitioning will ensure that TransformOperator will be partitioned 
right at the starting of the application and will remain partitioned throughout 
the lifetime of the DAG.
+TransformOperator can be stateless partitioned by adding following lines to 
properties.xml:
+
+```xml
+  <property>
+    <name>dt.operator.{OperatorName}.attr.PARTITIONER</name>
+    <value>com.datatorrent.common.partitioner.StatelessPartitioner:{N}/value>
+  </property>
+```
+
+where {OperatorName} is the name of the TransformOperator operator and
+      {N} is the number of static partitions.
+Above lines will partition TransformOperator statically {N} times. 
+
+#### Dynamic Partitioning
+Dynamic partitioning is a feature of Apex platform which changes the partition 
of the operator based on certain condition.
+TransformOperator can be dynamically partitioned using the below two 
partitioners:
+
+##### Throughput based
+Following code can be added to populateDAG(DAG dag, Configuration conf) method 
of application to dynamically partitioning TransformOperator:
+```java
+StatelessThroughputBasedPartitioner<TransformOperator> partitioner = new 
StatelessThroughputBasedPartitioner<>();
+partitioner.setCooldownMillis(10000);
+partitioner.setMaximumEvents(30000);
+partitioner.setMinimumEvents(10000);
+dag.setAttribute(transform, OperatorContext.STATS_LISTENERS, Arrays.asList(new 
StatsListener[]{partitioner}));
+dag.setAttribute(transform, OperatorContext.PARTITIONER, partitioner);
+```
+
+Above code will dynamically partition TransformOperator when the throughput 
changes.
+If the overall throughput of TransformOperator goes beyond 30000 or less than 
10000, the platform will repartition TransformOperator 
+to balance throughput of a single partition to be between 10000 and 30000.
+CooldownMillis of 10000 will be used as the threshold time for which the 
throughout change is observed.
+
+Source code for this dynamic application can be found 
[here](https://github.com/DataTorrent/examples/blob/master/tutorials/transform/src/main/java/com/example/transform/DynamicTransformApplication.java).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1a9c75be/mkdocs.yml
----------------------------------------------------------------------
diff --git a/mkdocs.yml b/mkdocs.yml
index 3dd0137..175850a 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -17,4 +17,5 @@ pages:
     - Windowed Operator: operators/windowedOperator.md
     - Json Parser: operators/jsonParser.md
     - Json Formatter: operators/jsonFormatter.md
+    - Transform Operator: operators/transform.md
 

Reply via email to