http://git-wip-us.apache.org/repos/asf/apex-site/blob/b25c090d/docs/malhar-3.7/operators/kafkaInputOperator/index.html
----------------------------------------------------------------------
diff --git a/docs/malhar-3.7/operators/kafkaInputOperator/index.html 
b/docs/malhar-3.7/operators/kafkaInputOperator/index.html
new file mode 100644
index 0000000..e10200d
--- /dev/null
+++ b/docs/malhar-3.7/operators/kafkaInputOperator/index.html
@@ -0,0 +1,773 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Kafka Input - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link 
href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700'
 rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Kafka Input";
+    var mkdocs_page_input_path = "operators/kafkaInputOperator.md";
+    var mkdocs_page_url = "/operators/kafkaInputOperator/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar 
Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" 
method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" 
aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>APIs</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../../apis/calcite/">SQL</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">CSV Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvParserOperator/">CSV Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../deduper/">Deduper</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../enricher/">Enricher</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../AbstractJdbcTransactionableOutputOperator/">Jdbc 
Output Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jdbcPollInputOperator/">JDBC Poller Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">JSON Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">JSON Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Kafka Input</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#kafka-input-operator">KAFKA 
INPUT OPERATOR</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#introduction">Introduction</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#kafka-input-operator-for-kafka-08x">Kafka Input Operator for Kafka 
0.8.x</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#abstractkafkainputoperator">AbstractKafkaInputOperator</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#kafkaconsumer">KafkaConsumer</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#pre-requisites">Pre-requisites</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#offsetmanager">OffsetManager</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#partitioning">Partitioning</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#abstractsingleportkafkainputoperator">AbstractSinglePortKafkaInputOperator</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#concrete-classes">Concrete Classes</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#application-example">Application Example</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#kafka-input-operator-for-kafka-09x">Kafka Input Operator for Kafka 
0.9.x</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#abstractkafkainputoperator_1">AbstractKafkaInputOperator</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#concrete-classes_1">Concrete Classes</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#application-example_1">Application Example</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transformer</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../xmlParserOperator/">XML Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">Json Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transform Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">Csv Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../s3outputmodule/">S3 Output Module</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>Kafka Input</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="kafka-input-operator">KAFKA INPUT OPERATOR</h1>
+<h3 id="introduction">Introduction</h3>
+<p><a href="http://kafka.apache.org";>Apache Kafka</a> is a pull-based and 
distributed publish subscribe messaging system,
+topics are partitioned and replicated across nodes. </p>
+<p>The Kafka input operator consumes data from the partitions of a Kafka topic 
for processing in Apex. 
+The operator has the ability to automatically scale with the Kafka 
partitioning for high throughput. 
+It is fault-tolerant (consumer offset checkpointing) and guarantees 
idempotency to allow exactly-once results in the downstream pipeline.</p>
+<p>For more information about the operator design see this <a 
href="http://www.slideshare.net/ApacheApex/apache-apex-kafka-input-operator";>presentation</a>
+and for processing guarantees this <a 
href="https://www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/";>blog</a>.</p>
+<p>There are two separate implementations of the input operator,
+one built against Kafka 0.8 client and a newer version for the
+Kafka 0.9 consumer API that also works with MapR Streams.
+These reside in different packages and are described separately below.</p>
+<h3 id="kafka-input-operator-for-kafka-08x">Kafka Input Operator for Kafka 
0.8.x</h3>
+<p>Package: <code>com.datatorrent.contrib.kafka</code></p>
+<p>Maven artifact: <a 
href="https://mvnrepository.com/artifact/org.apache.apex/malhar-contrib";>malhar-contrib</a></p>
+<h3 id="abstractkafkainputoperator">AbstractKafkaInputOperator</h3>
+<p>This is the abstract implementation that serves as base class for consuming 
messages from Kafka messaging system. This class doesn’t have any ports.</p>
+<p><img alt="AbstractKafkaInput.png" src="../images/kafkainput/image00.png" 
/></p>
+<h4 id="configuration-parameters">Configuration Parameters</h4>
+<p><table>
+<col width="25%" />
+<col width="75%" />
+<tbody>
+<tr class="odd">
+<td align="left"><p>Parameter</p></td>
+<td align="left"><p>Description</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>maxTuplesPerWindow</p></td>
+<td align="left"><p>Controls the maximum number of messages emitted in each 
streaming window from this operator. Minimum value is 1. Default value = 
MAX_VALUE </p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>idempotentStorageManager</p></td>
+<td align="left"><p>This is an instance of IdempotentStorageManager. 
Idempotency ensures that the operator will process the same set of messages in 
a window before and after a failure. For example, let's say the operator 
completed window 10 and failed somewhere between window 11. If the operator 
gets restored at window 10 then it will process the same messages again in 
window 10 which it did in the previous run before the failure. Idempotency is 
important but comes with higher cost because at the end of each window the 
operator needs to persist some state with respect to that window. Default Value 
= 
com.datatorrent.lib.io.IdempotentStorageManager.<br>NoopIdempotentStorageManager</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>strategy</p></td>
+<td align="left"><p>Operator supports two types of partitioning strategies, 
ONE_TO_ONE and ONE_TO_MANY.</p>
+<p>ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator 
instance per Kafka topic partition. So the number of Kafka topic partitions 
equals the number of operator instances.</p>
+<p>ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka 
input operator instances where N is the number of Kafka topic partitions. If K 
is less than N, the remaining topic partitions are assigned to the K operator 
instances in round-robin fashion. If K is less than initialPartitionCount, the 
AppMaster creates one input operator instance per Kafka topic partition. For 
example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 
then AppMaster creates 2 Kafka input operator instances.
+Default Value = ONE_TO_ONE</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>msgRateUpperBound</p></td>
+<td align="left"><p>Maximum messages upper bound. Operator repartitions when 
the <em>msgProcessedPS</em> exceeds this bound. <em>msgProcessedPS</em> is the 
average number of messages processed per second by this operator.</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>byteRateUpperBound</p></td>
+<td align="left"><p>Maximum bytes upper bound. Operator repartitions when the 
<em>bytesPS</em> exceeds this bound. <em>bytesPS</em> is the average number of 
bytes processed per second by this operator.</p>
+<p></p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>offsetManager</p></td>
+<td align="left"><p>This is an optional parameter that is useful when the 
application restarts or start at specific offsets (offsets are explained 
below)</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>repartitionInterval</p></td>
+<td align="left"><p>Interval specified in milliseconds. This value specifies 
the minimum time required between two repartition actions. Default Value = 30 
Seconds</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>repartitionCheckInterval</p></td>
+<td align="left"><p>Interval specified in milliseconds. This value specifies 
the minimum interval between two offset updates. Default Value = 5 
Seconds</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>initialPartitionCount</p></td>
+<td align="left"><p>When the ONE_TO_MANY partition strategy is enabled, this 
value indicates the number of Kafka input operator instances. Default Value = 
1</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>consumer</p></td>
+<td align="left"><p>This is an instance of 
com.datatorrent.contrib.kafka.KafkaConsumer. Default Value = Instance of 
SimpleKafkaConsumer.</p></td>
+</tr>
+</tbody>
+</table></p>
+<h4 id="abstract-methods">Abstract Methods</h4>
+<p><code>void emitTuple(Message message)</code>: Abstract method that emits 
tuples extracted from Kafka message.</p>
+<h3 id="kafkaconsumer">KafkaConsumer</h3>
+<p>This is an abstract implementation of Kafka consumer. It sends the fetch
+requests to the leading brokers of Kafka partitions. For each request,
+it receives the set of messages and stores them into the buffer which is
+ArrayBlockingQueue. SimpleKafkaConsumer which extends
+KafkaConsumer and serves the functionality of Simple Consumer API and
+HighLevelKafkaConsumer which extends KafkaConsumer and  serves the
+functionality of High Level Consumer API.</p>
+<h3 id="pre-requisites">Pre-requisites</h3>
+<p>This operator uses the Kafka 0.8.2.1 client consumer API
+and will work with 0.8.x and 0.7.x versions of Kafka broker.</p>
+<h4 id="configuration-parameters_1">Configuration Parameters</h4>
+<table>
+<col width="15%" />
+<col width="15%" />
+<col width="15%" />
+<col width="55%" />
+<tbody>
+<tr class="odd">
+<td align="left"><p>Parameter</p></td>
+<td align="left"><p>Type</p></td>
+<td align="left"><p>Default</p></td>
+<td align="left"><p>Description</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>zookeeper</p></td>
+<td align="left"><p>String</p></td>
+<td align="left"><p></p></td>
+<td align="left"><p>Specifies the zookeeper quorum of Kafka clusters that you 
want to consume messages from. zookeeper  is a string in the form of 
hostname1:port1,hostname2:port2,hostname3:port3  where 
hostname1,hostname2,hostname3 are hosts and port1,port2,port3 are ports of 
zookeeper server.  If the topic name is the same across the Kafka clusters and 
want to consume data from these clusters, then configure the zookeeper as 
follows: c1::hs1:p1,hs2:p2,hs3:p3;c2::hs4:p4,hs5:p5,c3::hs6:p6</p>
+<p>where</p>
+<p>c1,c2,c3 indicates the cluster names, hs1,hs2,hs3,hs4,hs5,hs6 are zookeeper 
hosts and p1,p2,p3,p4,p5,p6 are corresponding ports. Here, cluster name is 
optional in case of single cluster</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>cacheSize</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>1024</p></td>
+<td align="left"><p>Maximum of buffered messages hold in memory.</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>topic</p></td>
+<td align="left"><p>String</p></td>
+<td align="left"><p>default_topic</p></td>
+<td align="left"><p>Indicates the name of the topic.</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>initialOffset</p></td>
+<td align="left"><p>String</p></td>
+<td align="left"><p>latest</p></td>
+<td align="left"><p>Indicates the type of offset i.e, “earliest or 
latest”. If initialOffset is “latest”, then the operator consumes 
messages from latest point of Kafka queue. If initialOffset is “earliest”, 
then the operator consumes messages starting from message queue. This can be 
overridden by OffsetManager.</p></td>
+</tr>
+</tbody>
+</table>
+
+<h4 id="abstract-methods_1">Abstract Methods</h4>
+<ol>
+<li>void commitOffset(): Commit the offsets at checkpoint.</li>
+<li>Map &lt;KafkaPartition, Long&gt; getCurrentOffsets(): Return the current
+    offset status.</li>
+<li>resetPartitionsAndOffset(Set &lt;KafkaPartition&gt; partitionIds,
+    Map &lt;KafkaPartition, Long&gt; startOffset): Reset the partitions with
+    parittionIds and offsets with startOffset.</li>
+</ol>
+<h4 id="configuration-parameters-for-simplekafkaconsumer">Configuration 
Parameters for SimpleKafkaConsumer</h4>
+<table>
+<col width="25%" />
+<col width="15%" />
+<col width="15%" />
+<col width="45%" />
+<tbody>
+<tr class="odd">
+<td align="left"><p>Parameter</p></td>
+<td align="left"><p>Type</p></td>
+<td align="left"><p>Default</p></td>
+<td align="left"><p>Description</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>bufferSize</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>1 MB</p></td>
+<td align="left"><p>Specifies the maximum total size of messages for each 
fetch request.</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>metadataRefreshInterval</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>30 Seconds</p></td>
+<td align="left"><p>Interval in between refresh the metadata change(broker 
change) in milliseconds. Enabling metadata refresh guarantees an automatic 
reconnect when a new broker is elected as the host. A value of -1 disables this 
feature.</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>metadataRefreshRetryLimit</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>-1</p></td>
+<td align="left"><p>Specifies the maximum brokers' metadata refresh retry 
limit. -1 means unlimited retry.</p></td>
+</tr>
+</tbody>
+</table>
+
+<h3 id="offsetmanager">OffsetManager</h3>
+<p>This is an interface for offset management and is useful when consuming data
+from specified offsets. Updates the offsets for all the Kafka partitions
+periodically. Below is the code snippet:        </p>
+<pre><code class="java">public interface OffsetManager
+{
+  public Map&lt;KafkaPartition, Long&gt; loadInitialOffsets();
+  public void updateOffsets(Map&lt;KafkaPartition, Long&gt; 
offsetsOfPartitions);
+}
+</code></pre>
+
+<h4 id="abstract-methods_2">Abstract Methods</h4>
+<p><code>Map &lt;KafkaPartition, Long&gt; loadInitialOffsets()</code>: 
Specifies the initial offset for consuming messages; called at the activation 
stage.</p>
+<p><code>updateOffsets(Map&lt;KafkaPartition, Long&gt; 
offsetsOfPartitions)</code>:  This
+method is called at every repartitionCheckInterval to update offsets.</p>
+<h3 id="partitioning">Partitioning</h3>
+<p>The logical instance of the KafkaInputOperator acts as the Partitioner
+as well as a StatsListener. This is because the
+AbstractKafkaInputOperator implements both the
+com.datatorrent.api.Partitioner and com.datatorrent.api.StatsListener
+interfaces and provides an implementation of definePartitions(...) and
+processStats(...) which makes it auto-scalable.</p>
+<h4 id="response-processstatsbatchedoperatorstats-stats">Response 
processStats(BatchedOperatorStats stats)</h4>
+<p>The application master invokes this method on the logical instance with
+the stats (tuplesProcessedPS, bytesPS, etc.) of each partition.
+Re-partitioning happens based on whether any new Kafka partitions added for
+the topic or bytesPS and msgPS cross their respective upper bounds.</p>
+<h4 id="definepartitions">DefinePartitions</h4>
+<p>Based on the repartitionRequired field of the Response object which is
+returned by processStats(...) method, the application master invokes
+definePartitions(...) on the logical instance which is also the
+partitioner instance. Dynamic partition can be disabled by setting the
+parameter repartitionInterval value to a negative value.</p>
+<h3 
id="abstractsingleportkafkainputoperator">AbstractSinglePortKafkaInputOperator</h3>
+<p>This class extends AbstractKafkaInputOperator to emit messages through 
single output port.</p>
+<h4 id="ports">Ports</h4>
+<p><code>outputPort &lt;T&gt;</code>: Tuples extracted from Kafka messages are 
emitted through this port.</p>
+<h4 id="abstract-methods_3">Abstract Methods</h4>
+<p><code>T getTuple(Message msg)</code>: Converts the Kafka message to 
tuple.</p>
+<h3 id="concrete-classes">Concrete Classes</h3>
+<ol>
+<li>KafkaSinglePortStringInputOperator: extends 
<code>AbstractSinglePortKafkaInputOperator</code>, extracts string from Kafka 
message.</li>
+<li>KafkaSinglePortByteArrayInputOperator: extends 
<code>AbstractSinglePortKafkaInputOperator</code>, extracts byte array from 
Kafka message.</li>
+</ol>
+<h3 id="application-example">Application Example</h3>
+<p>This section builds an Apex application using Kafka input operator.
+Below is the code snippet:</p>
+<pre><code class="java">@ApplicationAnnotation(name = &quot;KafkaApp&quot;)
+public class ExampleKafkaApplication implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration entries)
+  {
+    KafkaSinglePortByteArrayInputOperator input =  
dag.addOperator(&quot;MessageReader&quot;, new 
KafkaSinglePortByteArrayInputOperator());
+    ConsoleOutputOperator output = dag.addOperator(&quot;Output&quot;, new 
ConsoleOutputOperator());
+    dag.addStream(&quot;MessageData&quot;, input.outputPort, output.input);
+  }
+}
+</code></pre>
+
+<p>Below is the configuration for “test” Kafka topic name and
+“localhost:2181” is the zookeeper forum:</p>
+<pre><code class="xml">&lt;property&gt;
+  &lt;name&gt;dt.operator.MessageReader.prop.topic&lt;/name&gt;
+  &lt;value&gt;test&lt;/value&gt;
+&lt;/property&gt;
+
+&lt;property&gt;
+  &lt;name&gt;dt.operator.KafkaInputOperator.prop.zookeeper&lt;/nam&gt;
+  &lt;value&gt;localhost:2181&lt;/value&gt;
+&lt;/property&gt;
+</code></pre>
+
+<h3 id="kafka-input-operator-for-kafka-09x">Kafka Input Operator for Kafka 
0.9.x</h3>
+<p>Package: <code>org.apache.apex.malhar.kafka</code></p>
+<p>Maven Artifact: <a 
href="https://mvnrepository.com/artifact/org.apache.apex/malhar-kafka";>malhar-kafka</a></p>
+<p>This version uses the new 0.9 version of consumer API and works with Kafka 
broker version 0.9 and later.
+The operator is fault-tolerant, scalable and supports input from multiple 
clusters and multiple topics in a single operator instance.</p>
+<h4 id="pre-requisites_1">Pre-requisites</h4>
+<p>This operator requires version 0.9.0 or later of the Kafka Consumer API.</p>
+<h3 id="abstractkafkainputoperator_1">AbstractKafkaInputOperator</h3>
+<h4 id="ports_1">Ports</h4>
+<hr />
+<p>This abstract class doesn't have any ports.</p>
+<h4 id="configuration-properties">Configuration properties</h4>
+<hr />
+<ul>
+<li>
+<p><strong><em>clusters</em></strong> - String[]</p>
+<ul>
+<li>Mandatory Parameter.</li>
+<li>Specifies the Kafka clusters that you want to consume messages from. To 
configure multi-cluster support, you need to specify the clusters separated by 
";".</li>
+</ul>
+</li>
+<li>
+<p><strong><em>topics</em></strong> - String[]</p>
+<ul>
+<li>Mandatory Parameter.</li>
+<li>Specified the Kafka topics that you want to consume messages from. If you 
want multi-topic support, then specify the topics separated by ",".</li>
+</ul>
+</li>
+<li>
+<p><strong><em>strategy</em></strong> - PartitionStrategy</p>
+<ul>
+<li>
+<p>Operator supports two types of partitioning strategies, 
<code>ONE_TO_ONE</code> and <code>ONE_TO_MANY</code>.</p>
+<p><code>ONE_TO_ONE</code>: If this is enabled, the AppMaster creates one 
input operator instance per Kafka topic partition. So the number of Kafka topic 
partitions equals the number of operator instances.
+<code>ONE_TO_MANY</code>: The AppMaster creates K = min(initialPartitionCount, 
N) Kafka input operator instances where N is the number of Kafka topic 
partitions. If K is less than N, the remaining topic partitions are assigned to 
the K operator instances in round-robin fashion. If K is less than 
initialPartitionCount, the AppMaster creates one input operator instance per 
Kafka topic partition. For example, if initialPartitionCount = 5 and number of 
Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.
+Default Value = <code>PartitionStrategy.ONE_TO_ONE</code>.</p>
+</li>
+</ul>
+</li>
+<li>
+<p><strong><em>initialPartitionCount</em></strong> - Integer</p>
+<ul>
+<li>When the ONE_TO_MANY partition strategy is enabled, this value indicates 
the number of Kafka input operator instances. 
+    Default Value = 1.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>repartitionInterval</em></strong> - Long</p>
+<ul>
+<li>Interval specified in milliseconds. This value specifies the minimum time 
required between two repartition actions. 
+    Default Value = 30 Seconds.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>repartitionCheckInterval</em></strong> - Long</p>
+<ul>
+<li>Interval specified in milliseconds. This value specifies the minimum 
interval between two stat checks.
+    Default Value = 5 Seconds.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>maxTuplesPerWindow</em></strong> - Integer</p>
+<ul>
+<li>Controls the maximum number of messages emitted in each streaming window 
from this operator. Minimum value is 1. 
+    Default value = <code>MAX_VALUE</code> </li>
+</ul>
+</li>
+<li>
+<p><strong><em>initialOffset</em></strong> - InitialOffset</p>
+<ul>
+<li>Indicates the type of offset i.e, <code>EARLIEST</code> or 
<code>LATEST</code> or <code>APPLICATION_OR_EARLIEST</code> or 
<code>APPLICATION_OR_LATEST</code>. 
+    <code>LATEST</code> =&gt; Consume new messages from latest offset in the 
topic. 
+    <code>EARLIEST</code> =&gt; Consume all messages available in the topic.
+    <code>APPLICATION_OR_EARLIEST</code> =&gt; Consume messages from committed 
position from last run. If there is no committed offset, then start consuming 
from beginning.
+    <code>APPLICATION_OR_LATEST</code> =&gt; Consumes messages from committed 
position from last run. If a committed offset is unavailable, then start 
consuming from latest position.
+    Default value = <code>InitialOffset.APPLICATION_OR_LATEST</code></li>
+</ul>
+</li>
+<li>
+<p><strong><em>metricsRefreshInterval</em></strong> - Long</p>
+<ul>
+<li>Interval specified in milliseconds. This value specifies the minimum 
interval between two metric stat updates.
+    Default value = 5 Seconds.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>consumerTimeout</em></strong> - Long</p>
+<ul>
+<li>Indicates the <a 
href="http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll";>time
 waiting in poll</a> when data is not available.
+    Default value = 5 Seconds.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>holdingBufferSize</em></strong> - Long</p>
+<ul>
+<li>Indicates the maximum number of messages kept in memory for emitting.
+    Default value = 1024.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>consumerProps</em></strong> - Properties</p>
+<ul>
+<li>Specify the [consumer 
properties[(http://kafka.apache.org/090/documentation.html#newconsumerconfigs) 
which are not yet set to the operator.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>windowDataManager</em></strong> - WindowDataManager</p>
+<ul>
+<li>If set to a value other than the default, such as 
<code>FSWindowDataManager</code>, specifies that the operator will process the 
same set of messages in a window before and after a failure. This is important 
but it comes with higher cost because at the end of each window the operator 
needs to persist some state with respect to that window.
+    Default value = <code>WindowDataManager.NoopWindowDataManager</code>.</li>
+</ul>
+</li>
+</ul>
+<h4 id="abstract-methods_4">Abstract Methods</h4>
+<p><code>void emitTuple(String cluster, ConsumerRecord&lt;byte[], byte[]&gt; 
message)</code>: Abstract method that emits tuples
+extracted from Kafka message.</p>
+<h3 id="concrete-classes_1">Concrete Classes</h3>
+<h4 id="kafkasingleportinputoperator">KafkaSinglePortInputOperator</h4>
+<p>This class extends from AbstractKafkaInputOperator and defines the 
<code>getTuple()</code> method which extracts byte array from Kafka message.</p>
+<h4 id="ports_2">Ports</h4>
+<p><code>outputPort &lt;byte[]&gt;</code>: Tuples extracted from Kafka 
messages are emitted through this port.</p>
+<h3 id="application-example_1">Application Example</h3>
+<p>This section builds an Apex application using Kafka input operator.
+Below is the code snippet:</p>
+<pre><code class="java">@ApplicationAnnotation(name = &quot;KafkaApp&quot;)
+public class ExampleKafkaApplication implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration entries)
+  {
+    KafkaSinglePortInputOperator input =  
dag.addOperator(&quot;MessageReader&quot;, new KafkaSinglePortInputOperator());
+    ConsoleOutputOperator output = dag.addOperator(&quot;Output&quot;, new 
ConsoleOutputOperator());
+    dag.addStream(&quot;MessageData&quot;, input.outputPort, output.input);
+  }
+}
+</code></pre>
+
+<p>Below is the configuration for “test” Kafka topic name and
+“localhost:9092” is the Broker:</p>
+<pre><code class="xml">&lt;property&gt;
+  &lt;name&gt;dt.operator.MessageReader.prop.topics&lt;/name&gt;
+  &lt;value&gt;test&lt;/value&gt;
+&lt;/property&gt;
+
+&lt;property&gt;
+  &lt;name&gt;dt.operator.KafkaInputOperator.prop.clusters&lt;/nam&gt;
+  &lt;value&gt;localhost:9092&lt;/value&gt;
+&lt;/property&gt;
+</code></pre>
+
+<p>By adding following lines to properties file, Kafka Input Operator supports 
multi-topic and multi-cluster:</p>
+<pre><code class="xml">&lt;property&gt;
+  &lt;name&gt;dt.operator.MessageReader.prop.topics&lt;/name&gt;
+  &lt;value&gt;test1, test2&lt;/value&gt;
+&lt;/property&gt;
+
+&lt;property&gt;
+  &lt;name&gt;dt.operator.KafkaInputOperator.prop.clusters&lt;/nam&gt;
+  &lt;value&gt;localhost:9092; localhost:9093; localhost:9094&lt;/value&gt;
+&lt;/property&gt;
+</code></pre>
+
+<p>For a full example application project, refer to 
https://github.com/DataTorrent/examples/tree/master/tutorials/kafka</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer 
navigation">
+      
+        <a href="../transform/" class="btn btn-neutral float-right" 
title="Transformer">Next <span class="icon icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../jsonParser/" class="btn btn-neutral" title="JSON 
Parser"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org";>MkDocs</a> using a <a 
href="https://github.com/snide/sphinx_rtd_theme";>theme</a> provided by <a 
href="https://readthedocs.org";>Read the Docs</a>.
+</footer>
+         
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../jsonParser/" style="color: #fcfcfc;">&laquo; 
Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../transform/" style="color: 
#fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/apex-site/blob/b25c090d/docs/malhar-3.7/operators/s3outputmodule/index.html
----------------------------------------------------------------------
diff --git a/docs/malhar-3.7/operators/s3outputmodule/index.html 
b/docs/malhar-3.7/operators/s3outputmodule/index.html
new file mode 100644
index 0000000..9f7f3bf
--- /dev/null
+++ b/docs/malhar-3.7/operators/s3outputmodule/index.html
@@ -0,0 +1,581 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>S3 Output Module - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link 
href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700'
 rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "S3 Output Module";
+    var mkdocs_page_input_path = "operators/s3outputmodule.md";
+    var mkdocs_page_url = "/operators/s3outputmodule/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar 
Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" 
method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" 
aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>APIs</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../../apis/calcite/">SQL</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">CSV Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvParserOperator/">CSV Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../deduper/">Deduper</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../enricher/">Enricher</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../AbstractJdbcTransactionableOutputOperator/">Jdbc 
Output Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jdbcPollInputOperator/">JDBC Poller Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">JSON Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">JSON Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transformer</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../xmlParserOperator/">XML Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">Json Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transform Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">Csv Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">S3 Output Module</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a 
href="#s3outputmodule">S3OutputModule</a></li>
+                
+                    <li><a class="toctree-l4" href="#about-amazon-s3">About 
Amazon S3</a></li>
+                
+                    <li><a class="toctree-l4" href="#s3-output-module">S3 
Output Module</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#overview">Overview</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#multipart-upload-feature">Multipart Upload Feature</a></li>
+                
+                    <li><a class="toctree-l4" href="#module">Module</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#operators-in-s3outputmodule">Operators in S3OutputModule</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#configuration-parameters">Configuration Parameters</a></li>
+                
+                    <li><a class="toctree-l4" href="#ports">Ports</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#application-example">Application Example</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#partitioning">Partitioning</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>S3 Output Module</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="s3outputmodule">S3OutputModule</h1>
+<h3 id="about-amazon-s3">About Amazon S3</h3>
+<hr />
+<p><strong>Amazon S3 (Simple Storage Service)</strong> is an object storage 
system with a web service interface to store and retrieve any amount of data at 
any time from anywhere on the web, offered by Amazon Web Services.</p>
+<h3 id="s3-output-module">S3 Output Module</h3>
+<hr />
+<p>Purpose of S3Output module is to upload files/directories into an Amazon S3 
bucket using the multipart upload feature(see below).</p>
+<p>S3Output module is <strong>fault-tolerant</strong>, 
<strong>statically/dynamically partitionable</strong> and has <strong>exactly 
once</strong> semantics.</p>
+<p>Module class is <strong>S3OutputModule</strong> located in the package 
<strong>org.apache.apex.malhar.lib.fs.s3</strong>; please refer to <a 
href="https://github.com/apache/apex-malhar/blob/master/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java";>github
 URL</a>.</p>
+<h3 id="overview">Overview</h3>
+<hr />
+<p>File upload to S3 can also be done using 
<strong>AbstractFileOutputOperator</strong> but that operator uploads large 
files sequentially; the current module in contrast can substantially improve 
the upload speed of large files by reading and uploading their constituent 
blocks in parallel. </p>
+<p>The table below lists additional benefits of this module over 
<strong>AbstractFileOutputOperator</strong>.</p>
+<table>
+<thead>
+<tr>
+<th><strong>S3OutputModule</strong></th>
+<th><strong>AbstractFileOutputOperator</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td>Maximum upload file size is 5TB.</td>
+<td>Maximum upload file size is 5GB.</td>
+</tr>
+<tr>
+<td>Best fit for both large and small files.</td>
+<td>Best fit for small files.</td>
+</tr>
+<tr>
+<td>Module uses AmazonS3Client API's to upload objects into S3. Large files 
will upload using multipart feature and small files(single block) will upload 
using <strong>putObject(...)</strong> API</td>
+<td>Operator uses Hadoop filesystems like <strong>S3AFileSystem</strong>. 
Consists of couple of steps to upload object into S3: (1) Write the data into 
the local filesystem. (2) When the stream closes, filesystem uploads the local 
object into S3.</td>
+</tr>
+<tr>
+<td>If a block fails to upload then you need to re-upload the data for that 
block only</td>
+<td>If a file fails to upload then you need to re-upload the complete 
file.</td>
+</tr>
+</tbody>
+</table>
+<h3 id="multipart-upload-feature">Multipart Upload Feature</h3>
+<hr />
+<p>Uploading parts of a file is done via the <a 
href="http://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html";>multipart
 feature</a>; using this feature, each part of a file can be uploaded 
independently.
+After all parts of a file are uploaded successfully, Amazon S3 combines the 
parts as a single object.</p>
+<p>Please refer to the <a 
href="http://docs.aws.amazon.com/AmazonS3/latest/dev/llJavaUploadFile.html";>Java
 code</a> for uploading file into Amazon S3 bucket using multipart feature.</p>
+<h3 id="module">Module</h3>
+<hr />
+<p>A <strong>module</strong> is a group of operators pre-wired together so 
they work as a single conceptual entity in an application. Typically, a module 
will contain a set of input ports, output ports and configuration properties. 
The operators internal to the module will be automatically configured based on 
the supplied module properties.</p>
+<h3 id="operators-in-s3outputmodule">Operators in S3OutputModule</h3>
+<hr />
+<p>Following diagram illustrates the DAG in this module:</p>
+<p><img alt="" src="../images/s3output/s3outputmodule.png" /></p>
+<ul>
+<li>
+<p><strong><em>S3InitiateFileUploadOperator</em></strong></p>
+<ul>
+<li>Initiate the upload for the file using 
<strong>AmazonS3Client.initiateMultipartUpload(...)</strong> method only if the 
number of blocks for a file is greater than 1. By successfully initiating the 
upload, S3 returns a response of type 
<strong>InitiateMultipartUploadResult</strong>, which includes the 
<strong>upload ID</strong>, which is the unique identifier for the multipart 
upload. This <strong>upload ID</strong> must be included in each operation like 
block upload and upload completion request.
+If the file has single block then the operator emits an empty string, this is 
an indication to downstream operators to not use the multi-part feature.</li>
+<li>This operator emits the pair <strong>(filemetadata, uploadId)</strong> to 
<strong>S3FileMerger</strong> and the triple <strong>(filePath, metadata, 
uploadId)</strong> to <strong>S3BlockUploadOperator</strong>.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>S3BlockUploadOperator</em></strong></p>
+<ul>
+<li>This operator upload the blocks into S3 using different calls which depend 
on number of blocks of a file.
+If the file has single block then upload the block using 
<strong>AmazonS3Client.putObject(...)</strong> call. S3 returns a response of 
type <strong>PutObjectResult</strong> which includes the <strong>ETag</strong>.
+If the file has more blocks then upload the block using 
<strong>AmazonS3Client.uploadPart(...)</strong> call. S3 returns a response of 
type <strong>UploadPartResult</strong> which includes the 
<strong>ETag</strong>. This <strong>ETag</strong> value must be included in the 
request to complete multipart upload.</li>
+<li><strong>S3BlockUploadOperator</strong> emits the pair <strong>(path, 
ETag)</strong> to <strong>s3FileMerger</strong>.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>S3FileMerger</em></strong></p>
+<ul>
+<li>Complete multipart upload request using 
<strong>AmazonS3Client.completeMultipartUpload(...)</strong>. This call must 
include the <strong>upload ID</strong> and a list of both part numbers and 
corresponding <strong>ETag</strong> values. <strong>S3FileMerger</strong> sends 
the complete multi-part upload request to S3 once it has all the <strong>part 
ETag's</strong> of a file. 
+<strong>Amazon S3</strong> creates an object by concatenating the parts in 
ascending order based on part number. After a successful upload request, the 
parts no longer exist and S3 response includes an <strong>ETag</strong> which 
uniquely identifies the combined object data. </li>
+</ul>
+</li>
+</ul>
+<h3 id="configuration-parameters">Configuration Parameters</h3>
+<hr />
+<ul>
+<li>
+<p><strong><em>accessKey</em></strong> -   String</p>
+<ul>
+<li>Mandatory Parameter    </li>
+<li>Specifies the AWS access key to access Amazon S3 and has permissions to 
access the specified bucket.</li>
+<li>Example value = AKIAJVAGFANC2LSZCJ4Q</li>
+</ul>
+</li>
+<li>
+<p><strong><em>secretAccessKey</em></strong>   -   String</p>
+<ul>
+<li>Mandatory Parameter</li>
+<li>Specifies the AWS secret access key to access Amazon S3 and has 
permissions to access the specified bucket.</li>
+<li>Example value = wpVr3U82RmCKJoY007YfkaawT7CenhTcK1B8clue</li>
+</ul>
+</li>
+<li>
+<p><strong><em>endPoint</em></strong>  -   String</p>
+<ul>
+<li>Endpoint is the URL for the entry point for a web service. Specify the 
valid endpoint to access S3 bucket.</li>
+<li>This is an optional parameter. If the bucket is accessed only from 
specific end point then the user has to specify this parameter.</li>
+<li>Please refer to <a 
href="http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region";>endPoint</a>
 table about the endpoints supported by S3. </li>
+<li>Example value = s3.amazonaws.com </li>
+</ul>
+</li>
+<li>
+<p><strong><em>bucketName</em></strong>    -   String</p>
+<ul>
+<li>Mandatory Parameter</li>
+<li>S3 buckets are used to store objects which consists of data and metadata 
that describes the data. Specify the name of the bucket.</li>
+<li>Example value = apex.app.test.s3</li>
+</ul>
+</li>
+<li>
+<p><strong><em>outputDirectoryPath</em></strong>   -   String</p>
+<ul>
+<li>Mandatory Parameter</li>
+<li>Specifies the path of the output directory. </li>
+<li>Example value = dt/app/output</li>
+</ul>
+</li>
+<li>
+<p><strong><em>mergerCount</em></strong>   -   int</p>
+<ul>
+<li>Specify the number of instances of S3FileMerger operator.</li>
+<li>Default value = 1</li>
+</ul>
+</li>
+<li>
+<p><strong><em>timeOutWIndowCount</em></strong>    -   int</p>
+<ul>
+<li>This property maps to the <a 
href="https://github.com/apache/apex-core/blob/master/api/src/main/java/com/datatorrent/api/Context.java";>OperatorContext.TIMEOUT_WINDOW_COUNT</a>
 attribute and is a count of streaming windows. If specified, it will be set on 
all the operators of this module. Since these operators interact with S3, there 
may be additional latencies that cause the platform to kill them because they 
are considered stalled. Increasing this value prevents this and allows the 
application to proceed despite the latencies.</li>
+<li>Default value = 6000</li>
+</ul>
+</li>
+</ul>
+<h3 id="ports">Ports</h3>
+<hr />
+<ul>
+<li>
+<p><strong><em>filesMetadataInput</em></strong>    -   
AbstractFileSplitter.FileMetadata</p>
+<ul>
+<li>Input port for files metadata.</li>
+<li>Mandatory</li>
+</ul>
+</li>
+<li>
+<p><strong><em>blocksMetadataInput</em></strong>   -   
BlockMetadata.FileBlockMetadata</p>
+<ul>
+<li>Input port for blocks metadata.</li>
+<li>Mandatory</li>
+</ul>
+</li>
+<li>
+<p><strong><em>blockData</em></strong> -   
AbstractBlockReader.ReaderRecord<Slice></p>
+<ul>
+<li>Input port for blocks data.</li>
+<li>Mandatory</li>
+</ul>
+</li>
+</ul>
+<h3 id="application-example">Application Example</h3>
+<hr />
+<p>Please refer to <a 
href="https://github.com/DataTorrent/examples/tree/master/tutorials/s3output";>Example</a>
 for S3OutputModule sample application.</p>
+<h3 id="partitioning">Partitioning</h3>
+<hr />
+<p>Partitioning the module means that the operators in the module can be 
partitioned.</p>
+<h4 id="stateless-partitioning">Stateless Partitioning</h4>
+<hr />
+<p>Partitioning the operator in module can be achieved as follows:</p>
+<h5 id="s3initiatefileuploadoperator">S3InitiateFileUploadOperator</h5>
+<hr />
+<p>Partition of this operator is achieved indirectly as follows:</p>
+<pre><code class="xml">&lt;property&gt;
+  
&lt;name&gt;dt.operator.{ModuleName}#InitiateUpload.attr.PARTITIONER&lt;/name&gt;
+  
&lt;value&gt;com.datatorrent.common.partitioner.StatelessPartitioner:{N}&lt;/value&gt;
+&lt;/property&gt;     
+</code></pre>
+
+<p>where {ModuleName} is the name of the S3OutputModule and
+      {N} is the number of static partitions.
+Above lines will partition S3InitiateFileUploadOperator statically {N} 
times.</p>
+<h5 id="s3blockuploadoperator">S3BlockUploadOperator</h5>
+<hr />
+<p>Locality of S3BlockUploadOperator with upstream operator 
(FSInputModule/BlockReader) must set to PARTITION_PARALLEL for performance 
benefits by avoiding serialization/deserialization of objects. So, partitioning 
of this operator depends on upstream operator which is of type 
FSInputModule/BlockReader.</p>
+<h5 id="s3filemerger">S3FileMerger</h5>
+<hr />
+<p>By setting the parameter "mergerCount", <strong>S3FileMerger</strong> be 
statically partitioned. This can be achieved by two ways:</p>
+<p>(a) Following code can be added to populateDAG(DAG dag, Configuration conf) 
method of application to statically partitioning <strong>S3FileMerger</strong> 
{N} times:</p>
+<pre><code class="java">  FSInputModule inputModule = 
dag.addModule(&quot;HDFSInputModule&quot;, new FSInputModule());
+  S3OutputModule outputModule = dag.addModule(&quot;S3OutputModule&quot;, new 
S3OutputModule());
+  outputModule.setMergerCount({N});
+</code></pre>
+
+<p>(b) By setting the parameter in properties file as follows</p>
+<pre><code class="xml">  &lt;property&gt;
+    &lt;name&gt;dt.operator.{ModuleName}.prop.mergerCount&lt;/name&gt;
+    &lt;value&gt;{N}&lt;/value&gt;
+  &lt;/property&gt;
+</code></pre>
+
+<p>where {ModuleName} is the name of the S3OutputModule and {N} is the number 
of static partitions.
+Above lines will partition <strong>S3FileMerger</strong> statically {N} times. 
         </p>
+<h4 id="dynamic-partitioning">Dynamic Partitioning</h4>
+<hr />
+<p>Dynamic partitioning is a feature of Apex platform which changes the number 
of partitions of an operator at run time.
+Locality of <strong>S3BlockUploadOperator</strong> with upstream 
operator(FSInputModule/BlockReader) must set to PARTITION_PARALLEL for 
performance benefits by avoiding serialization/deserialization of objects. So, 
dynamic partitioning of this operator depends on upstream operator which is of 
type FSInputModule/BlockReader.</p>
+<p>From the example application, by setting the maxReaders and minReaders 
value to FSInputModule, <strong>S3BlockUploadOperator</strong> dynamically 
partitioned between minReaders and maxReaders. This can be achieved by two ways:
+(a) Following code can be added to <strong>populateDAG(DAG dag, Configuration 
conf)</strong> method of application to dynamically partitioned 
<strong>S3BlockUploadOperator</strong> between {N1} and {N2} times:</p>
+<pre><code class="java">FSInputModule inputModule = 
dag.addModule(&quot;HDFSInputModule&quot;, new FSInputModule());
+inputModule.setMinReaders({N1});
+inputModule.setMaxReaders({N2});
+S3OutputModule outputModule = dag.addModule(&quot;S3OutputModule&quot;, new 
S3OutputModule());
+</code></pre>
+
+<p>(b) By setting the parameter in properties file as follows:</p>
+<pre><code class="xml">&lt;property&gt;
+  &lt;name&gt;dt.operator.HDFSInputModule.prop.minReaders&lt;/name&gt;
+  &lt;value&gt;{N1}&lt;/value&gt;
+&lt;/property&gt;
+&lt;property&gt;
+  &lt;name&gt;dt.operator.HDFSInputModule.prop.maxReaders&lt;/name&gt;
+  &lt;value&gt;{N2}&lt;/value&gt;
+&lt;/property&gt;         
+</code></pre>
+
+<p>{N1} and {N2} represents the number of minimum and maximum partitions of 
BlockReader.
+Above lines will dynamically partitioned the 
<strong>S3BlockUploadOperator</strong> between {N1} and {N2} times. </p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer 
navigation">
+      
+      
+        <a href="../csvformatter/" class="btn btn-neutral" title="Csv 
Formatter"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org";>MkDocs</a> using a <a 
href="https://github.com/snide/sphinx_rtd_theme";>theme</a> provided by <a 
href="https://readthedocs.org";>Read the Docs</a>.
+</footer>
+         
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../csvformatter/" style="color: #fcfcfc;">&laquo; 
Previous</a></span>
+      
+      
+    </span>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/apex-site/blob/b25c090d/docs/malhar-3.7/operators/transform/index.html
----------------------------------------------------------------------
diff --git a/docs/malhar-3.7/operators/transform/index.html 
b/docs/malhar-3.7/operators/transform/index.html
new file mode 100644
index 0000000..42675ed
--- /dev/null
+++ b/docs/malhar-3.7/operators/transform/index.html
@@ -0,0 +1,598 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Transform Operator - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link 
href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700'
 rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Transform Operator";
+    var mkdocs_page_input_path = "operators/transform.md";
+    var mkdocs_page_url = "/operators/transform/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar 
Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" 
method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" 
aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>APIs</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../../apis/calcite/">SQL</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">CSV Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvParserOperator/">CSV Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../deduper/">Deduper</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../enricher/">Enricher</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../AbstractJdbcTransactionableOutputOperator/">Jdbc 
Output Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jdbcPollInputOperator/">JDBC Poller Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">JSON Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">JSON Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="./">Transformer</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../xmlParserOperator/">XML Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">Json Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Transform Operator</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a 
href="#transform-operator-documentation">Transform - Operator 
Documentation</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#about-transform-operator">About Transform operator</a></li>
+                
+                    <li><a class="toctree-l4" href="#use-case">Use 
Case</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#configuration-parameters">Configuration Parameters</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#configuration-example">Configuration Example</a></li>
+                
+                    <li><a class="toctree-l4" href="#ports">Ports</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#attributes">Attributes</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#application-example">Application Example</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#partitioning">Partitioning</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">Csv Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../s3outputmodule/">S3 Output Module</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>Transform Operator</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="transform-operator-documentation">Transform - Operator 
Documentation</h1>
+<h3 id="about-transform-operator">About Transform operator</h3>
+<hr />
+<p>Transform means mapping of field expression from input to output or 
conversion of fields from one type to another.
+This operator is stateless. This operator receives objects on its input port; 
for each such input object, it creates a new output object whose fields are 
computed as expressions involving fields of the input object. 
+The types of the input and output objects are configurable as are the 
expressions used to compute the output fields. </p>
+<p>The operator class is <code>TransformOperator</code> located in the package 
<code>com.datatorrent.lib.transform</code>.
+Please refer to <a 
href="https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java";>github
 URL</a> for <code>TransformOperator</code>.</p>
+<h3 id="use-case">Use Case</h3>
+<hr />
+<p>Consider the data that needs to be transformed as per output schema.</p>
+<p>Consider input objects with these fields:</p>
+<table>
+<thead>
+<tr>
+<th>Name</th>
+<th>Type</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td>FirstName</td>
+<td>String</td>
+</tr>
+<tr>
+<td>LastName</td>
+<td>String</td>
+</tr>
+<tr>
+<td>Phone</td>
+<td>String</td>
+</tr>
+<tr>
+<td>DateOfBirth</td>
+<td>java.util.Date</td>
+</tr>
+<tr>
+<td>Address</td>
+<td>String</td>
+</tr>
+</tbody>
+</table>
+<p>and output objects with fields: </p>
+<table>
+<thead>
+<tr>
+<th>Name</th>
+<th>Type</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td>Name</td>
+<td>String</td>
+</tr>
+<tr>
+<td>Phone</td>
+<td>String</td>
+</tr>
+<tr>
+<td>Age</td>
+<td>Integer</td>
+</tr>
+<tr>
+<td>Address</td>
+<td>String</td>
+</tr>
+</tbody>
+</table>
+<p>Suppose <code>Name</code> is a concatenation of <code>FirstName</code> and 
<code>LastName</code> and 
+        <code>Age</code> is computed by subtracting the 
<code>DateOfBirth</code> from the current year.</p>
+<p>These simple computations can be expressed as Java expressions where the 
input object is
+represented by $ and provided as configuration parameters as follows:</p>
+<pre><code>Name =&gt; {$.FirstName}.concat(\&quot; 
\&quot;).concat({$.LastName})
+Age =&gt; (new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()
+</code></pre>
+
+<h3 id="configuration-parameters">Configuration Parameters</h3>
+<hr />
+<ul>
+<li>
+<p><strong><em>expressionMap</em></strong> -   Map<String, String></p>
+<ul>
+<li>Mandatory Parameter</li>
+<li>Specifies the map between the output field (key) and the expression used 
to compute it (value) using fields of the input Java object.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>expressionFunctions</em></strong> -   List<String></p>
+<ul>
+<li>List of imported classes or methods should be made available to expression 
to use. It overrides the default list.</li>
+<li>Default Value = {java.lang.Math.<em>, 
org.apache.commons.lang3.StringUtils.</em>, 
org.apache.commons.lang3.StringEscapeUtils.<em>, 
org.apache.commons.lang3.time.DurationFormatUtils.</em>, 
org.apache.commons.lang3.time.DateFormatUtils.*}</li>
+</ul>
+</li>
+<li>
+<p><strong><em>copyMatchingFields</em></strong> -   boolean</p>
+<ul>
+<li>Specifies whether matching fields should be copied; here matching means 
the name and type of an input field is the same as the name and type of an 
output field. 
+    If the matching field appears in <code>expressionMap</code> then it 
ignores copy to output object.</li>
+<li>Default Value = true.</li>
+</ul>
+</li>
+</ul>
+<h3 id="configuration-example">Configuration Example</h3>
+<hr />
+<p>Consider input object with fields:</p>
+<table>
+<thead>
+<tr>
+<th>Name</th>
+<th>Type</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td>FirstName</td>
+<td>String</td>
+</tr>
+<tr>
+<td>LastName</td>
+<td>String</td>
+</tr>
+<tr>
+<td>StartDate</td>
+<td>org.joda.time.DateTime</td>
+</tr>
+</tbody>
+</table>
+<p>and output objects with fields:</p>
+<table>
+<thead>
+<tr>
+<th>Name</th>
+<th>Type</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td>Name</td>
+<td>String</td>
+</tr>
+<tr>
+<td>isLeapYear</td>
+<td>Boolean</td>
+</tr>
+</tbody>
+</table>
+<p>Note: <code>org.joda.time.DateTime</code> class is not present in the 
default list. So, we need to add this library to 
<code>expressionFunctions</code> as below in populateDAG method:</p>
+<pre><code class="java">TransformOperator operator = 
dag.addOperator(&quot;transform&quot;, new TransformOperator());
+operator.setExpressionFunctions(Arrays.asList(&quot;org.joda.time.DateTime&quot;,
 org.apache.commons.lang3.StringUtils));
+Map&lt;String,String&gt; expressionMap = new HashMap&lt;&gt;();
+expressionMap.put(isLeapYear, {$.StartDate}.year().isLeap());
+expressionMap.put(Name, org.apache.commons.lang3.StringUtils.joinWith(\&quot; 
\&quot;, {$.FirstName},{$.LastName});
+operator.setExpressionMap(expressionMap);
+</code></pre>
+
+<p>Above Properties also can be set in properties file as follows:</p>
+<pre><code class="xml">&lt;property&gt;
+  &lt;name&gt;dt.operator.transform.expressionFunctions[0]&lt;/name&gt;
+  &lt;value&gt;org.joda.time.DateTime&lt;/value&gt;
+&lt;/property&gt;     
+&lt;property&gt;
+  &lt;name&gt;dt.operator.transform.expressionFunctions[1]&lt;/name&gt;
+  &lt;value&gt;org.apache.commons.lang3.StringUtils&lt;/value&gt;
+&lt;/property&gt;
+&lt;property&gt;
+  &lt;name&gt;dt.operator.transform.expressionMap(isLeapYear)&lt;/name&gt;
+  &lt;value&gt;{$.StartDate}.year().isLeap()&lt;/value&gt;
+&lt;/property&gt;
+&lt;property&gt;
+  &lt;name&gt;dt.operator.transform.expressionMap(Name)&lt;/name&gt;
+  &lt;value&gt;org.apache.commons.lang3.StringUtils.joinWith(\&quot; \&quot;, 
{$.FirstName}, {$.LastName})&lt;/value&gt;
+&lt;/property&gt;
+</code></pre>
+
+<h3 id="ports">Ports</h3>
+<hr />
+<ul>
+<li>
+<p><strong><em>input</em></strong> -   Port for input tuples.</p>
+<ul>
+<li>Mandatory input port</li>
+</ul>
+</li>
+<li>
+<p><strong><em>output</em></strong>    -   Port for transformed output 
tuples.</p>
+<ul>
+<li>Mandatory output port</li>
+</ul>
+</li>
+</ul>
+<h3 id="attributes">Attributes</h3>
+<hr />
+<ul>
+<li>
+<p><strong><em>Input port Attribute - input.TUPLE_CLASS</em></strong> - Fully 
qualified class name and class should be Kryo serializable.</p>
+<ul>
+<li>Mandatory attribute</li>
+<li>Type of input tuple.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>Output port Attribute - output.TUPLE_CLASS</em></strong> - 
Fully qualified class name and class should be Kryo serializable.</p>
+<ul>
+<li>Mandatory attribute</li>
+<li>Type of output tuple.</li>
+</ul>
+</li>
+</ul>
+<h3 id="application-example">Application Example</h3>
+<hr />
+<p>Please refer <a 
href="https://github.com/DataTorrent/examples/tree/master/tutorials/transform";>Example</a>
 for transform sample application.</p>
+<h3 id="partitioning">Partitioning</h3>
+<hr />
+<p>Being stateless, this operator can be partitioned using any of the built-in 
partitioners present in the Malhar library by setting a few properties as 
follows:</p>
+<h4 id="stateless-partitioning">Stateless partitioning</h4>
+<p>Stateless partitioning will ensure that TransformOperator will be 
partitioned right at the starting of the application and will remain 
partitioned throughout the lifetime of the DAG.
+TransformOperator can be stateless partitioned by adding following lines to 
properties.xml:</p>
+<pre><code class="xml">  &lt;property&gt;
+    &lt;name&gt;dt.operator.{OperatorName}.attr.PARTITIONER&lt;/name&gt;
+    
&lt;value&gt;com.datatorrent.common.partitioner.StatelessPartitioner:{N}/value&gt;
+  &lt;/property&gt;
+</code></pre>
+
+<p>where {OperatorName} is the name of the TransformOperator operator and
+      {N} is the number of static partitions.
+Above lines will partition TransformOperator statically {N} times. </p>
+<h4 id="dynamic-partitioning">Dynamic Partitioning</h4>
+<p>Dynamic partitioning is a feature of Apex platform which changes the 
partition of the operator based on certain condition.
+TransformOperator can be dynamically partitioned using the below two 
partitioners:</p>
+<h5 id="throughput-based">Throughput based</h5>
+<p>Following code can be added to populateDAG(DAG dag, Configuration conf) 
method of application to dynamically partitioning TransformOperator:</p>
+<pre><code 
class="java">StatelessThroughputBasedPartitioner&lt;TransformOperator&gt; 
partitioner = new StatelessThroughputBasedPartitioner&lt;&gt;();
+partitioner.setCooldownMillis(10000);
+partitioner.setMaximumEvents(30000);
+partitioner.setMinimumEvents(10000);
+dag.setAttribute(transform, OperatorContext.STATS_LISTENERS, Arrays.asList(new 
StatsListener[]{partitioner}));
+dag.setAttribute(transform, OperatorContext.PARTITIONER, partitioner);
+</code></pre>
+
+<p>Above code will dynamically partition TransformOperator when the throughput 
changes.
+If the overall throughput of TransformOperator goes beyond 30000 or less than 
10000, the platform will repartition TransformOperator 
+to balance throughput of a single partition to be between 10000 and 30000.
+CooldownMillis of 10000 will be used as the threshold time for which the 
throughout change is observed.</p>
+<p>Source code for this dynamic application can be found <a 
href="https://github.com/DataTorrent/examples/blob/master/tutorials/transform/src/main/java/com/example/transform/DynamicTransformApplication.java";>here</a>.</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer 
navigation">
+      
+        <a href="../csvformatter/" class="btn btn-neutral float-right" 
title="Csv Formatter">Next <span class="icon 
icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../jsonFormatter/" class="btn btn-neutral" title="Json 
Formatter"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org";>MkDocs</a> using a <a 
href="https://github.com/snide/sphinx_rtd_theme";>theme</a> provided by <a 
href="https://readthedocs.org";>Read the Docs</a>.
+</footer>
+         
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../jsonFormatter/" style="color: #fcfcfc;">&laquo; 
Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../csvformatter/" 
style="color: #fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

Reply via email to