http://git-wip-us.apache.org/repos/asf/apex-site/blob/d7a21b68/content/docs/malhar-3.6/operators/block_reader/index.html
----------------------------------------------------------------------
diff --git a/content/docs/malhar-3.6/operators/block_reader/index.html 
b/content/docs/malhar-3.6/operators/block_reader/index.html
new file mode 100644
index 0000000..cfb2567
--- /dev/null
+++ b/content/docs/malhar-3.6/operators/block_reader/index.html
@@ -0,0 +1,470 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Block Reader - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link 
href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700'
 rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Block Reader";
+    var mkdocs_page_input_path = "operators/block_reader.md";
+    var mkdocs_page_url = "/operators/block_reader/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar 
Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" 
method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" 
aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Block Reader</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#block-reader">Block 
Reader</a></li>
+                
+                    <li><a class="toctree-l4" href="#why-is-it-needed">Why is 
it needed?</a></li>
+                
+                    <li><a class="toctree-l4" href="#class-diagram">Class 
Diagram</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#abstractblockreader">AbstractBlockReader</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#example-application">Example Application</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#abstractfsreadaheadlinereader">AbstractFSReadAheadLineReader</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#readaheadlinereadercontext">ReadAheadLineReaderContext</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#abstractfslinereader">AbstractFSLineReader</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#linereadercontext">LineReaderContext</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#fsslicereader">FSSliceReader</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#fixedbytesreadercontext">FixedBytesReaderContext</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#configuration_1">Configuration</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#partitioner-and-statslistener">Partitioner and StatsListener</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#processstats">processStats </a></li>
+                
+                    <li><a class="toctree-l4" 
href="#definepartitions">definePartitions</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvParserOperator/">Csv Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../enricher/">Enricher</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../deduper/">Deduper</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">Json Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transform Operator</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>Block Reader</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="block-reader">Block Reader</h1>
+<p>This is a scalable operator that reads and parses blocks of data sources 
into records. A data source can be a file or a message bus that contains 
records and a block defines a chunk of data in the source by specifying the 
block offset and the length of the source belonging to the block. </p>
+<h2 id="why-is-it-needed">Why is it needed?</h2>
+<p>A Block Reader is needed to parallelize reading and parsing of a single 
data source, for example a file. Simple parallelism of reading data sources can 
be achieved by multiple partitions reading different source of same type (for 
files see <a 
href="https://github.com/apache/incubator-apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java";>AbstractFileInputOperator</a>)
 but Block Reader partitions can read blocks of same source in parallel and 
parse them for records ensuring that no record is duplicated or missed.</p>
+<h2 id="class-diagram">Class Diagram</h2>
+<p><img alt="BlockReader class diagram" 
src="../images/blockreader/classdiagram.png" /></p>
+<h2 id="abstractblockreader">AbstractBlockReader</h2>
+<p>This is the abstract implementation that serves as the base for different 
types of data sources. It defines how a block metadata is processed. The flow 
diagram below describes the processing of a block metadata.</p>
+<p><img alt="BlockReader flow diagram" 
src="../images/blockreader/flowdiagram.png" /></p>
+<h3 id="ports">Ports</h3>
+<ul>
+<li>
+<p>blocksMetadataInput: input port on which block metadata are received.</p>
+</li>
+<li>
+<p>blocksMetadataOutput: output port on which block metadata are emitted if 
the port is connected. This port is useful when a downstream operator that 
receives records from block reader may also be interested to know the details 
of the corresponding blocks.</p>
+</li>
+<li>
+<p>messages: output port on which tuples of type 
<code>com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord</code> are 
emitted. This class encapsulates a <code>record</code> and the 
<code>blockId</code> of the corresponding block.</p>
+</li>
+</ul>
+<h3 id="readercontext">readerContext</h3>
+<p>This is one of the most important fields in the block reader. It is of type 
<code>com.datatorrent.lib.io.block.ReaderContext</code> and is responsible for 
fetching bytes that make a record. It also lets the reader know how many total 
bytes were consumed which may not be equal to the total bytes in a record 
because consumed bytes also include bytes for the record delimiter which may 
not be a part of the actual record.</p>
+<p>Once the reader creates an input stream for the block (or uses the previous 
opened stream if the current block is successor of the previous block) it 
initializes the reader context by invoking 
<code>readerContext.initialize(stream, blockMetadata, 
consecutiveBlock);</code>. Initialize method is where any implementation of 
<code>ReaderContext</code> can perform all the operations which have to be 
executed just before reading the block or create states which are used during 
the lifetime of reading the block.</p>
+<p>Once the initialization is done, <code>readerContext.next()</code> is 
called repeatedly until it returns <code>null</code>. It is left to the 
<code>ReaderContext</code> implementations to decide when a block is completely 
processed. In cases when a record is split across adjacent blocks, reader 
context may decide to read ahead of the current block boundary to completely 
fetch the split record (examples- <code>LineReaderContext</code> and 
<code>ReadAheadLineReaderContext</code>). In other cases when there isn't a 
possibility of split record (example- <code>FixedBytesReaderContext</code>), it 
returns <code>null</code> immediately when the block boundary is reached. The 
return type of <code>readerContext.next()</code> is of type 
<code>com.datatorrent.lib.io.block.ReaderContext.Entity</code> which is just a 
wrapper for a <code>byte[]</code> that represents the record and total bytes 
used in fetching the record.</p>
+<h3 id="abstract-methods">Abstract methods</h3>
+<ul>
+<li>
+<p><code>STREAM setupStream(B block)</code>: creating a stream for a block is 
dependent on the type of source which is not known to AbstractBlockReader. 
Sub-classes which deal with a specific data source provide this 
implementation.</p>
+</li>
+<li>
+<p><code>R convertToRecord(byte[] bytes)</code><a name="convertToRecord"></a>: 
this converts the array of bytes into the actual instance of record type.</p>
+</li>
+</ul>
+<h3 id="auto-scalability">Auto-scalability</h3>
+<p>Block reader can auto-scale, that is, depending on the backlog (total 
number of all the blocks which are waiting in the 
<code>blocksMetadataInput</code> port queue of all partitions) it can create 
more partitions or reduce them. Details are discussed in the last section which 
covers the <a href="#partitioning">partitioner and stats-listener</a>.</p>
+<h3 id="configuration">Configuration</h3>
+<ol>
+<li><a name="maxReaders"></a><strong>maxReaders</strong>: when auto-scaling is 
enabled, this controls the maximum number of block reader partitions that can 
be created.</li>
+<li><a name="minReaders"></a><strong>minReaders</strong>: when auto-scaling is 
enabled, this controls the minimum number of block reader partitions that 
should always exist.</li>
+<li><a name="collectStats"></a><strong>collectStats</strong>: this enables or 
disables auto-scaling. When it is set to <code>true</code> the stats (number of 
blocks in the queue) are collected and this triggers partitioning; otherwise 
auto-scaling is disabled.</li>
+<li><strong>intervalMillis</strong>: when auto-scaling is enabled, this 
specifies the interval at which the reader will trigger the logic of computing 
the backlog and auto-scale.</li>
+</ol>
+<h2 id="abstractfsblockreader"><a name="AbstractFSBlockReader"></a> 
AbstractFSBlockReader</h2>
+<p>This abstract implementation deals with files. Different types of file 
systems that are implementations of 
<code>org.apache.hadoop.fs.FileSystem</code> are supported. The user can 
override <code>getFSInstance()</code> method to create an instance of a 
specific <code>FileSystem</code>. By default, filesystem instance is created 
from the filesytem URI that comes from the default hadoop configuration.</p>
+<pre><code class="java">protected FileSystem getFSInstance() throws IOException
+{
+  return FileSystem.newInstance(configuration);
+}
+</code></pre>
+
+<p>It uses this filesystem instance to setup a stream of type 
<code>org.apache.hadoop.fs.FSDataInputStream</code> to read the block.</p>
+<pre><code class="java">@Override
+protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) 
throws IOException
+{
+  return fs.open(new Path(block.getFilePath()));
+}
+</code></pre>
+
+<p>All the ports and configurations are derived from the super class. It 
doesn't provide an implementation of <a 
href="#convertToRecord"><code>convertToRecord(byte[] bytes)</code></a> method 
which is delegated to concrete sub-classes.</p>
+<h3 id="example-application">Example Application</h3>
+<p>This simple dag demonstrates how any concrete implementation of 
<code>AbstractFSBlockReader</code> can be plugged into an application. </p>
+<p><img alt="Application with FSBlockReader" 
src="../images/blockreader/fsreaderexample.png" /></p>
+<p>In the above application, file splitter creates block metadata for files 
which are sent to block reader. Partitions of the block reader parses the file 
blocks for records which are filtered, transformed and then persisted to a file 
(created per block). Therefore block reader is parallel partitioned with the 2 
downstream operators - filter/converter and record output operator. The code 
which implements this dag is below.</p>
+<pre><code class="java">public class ExampleApplication implements 
StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    FileSplitterInput input = dag.addOperator(&quot;File-splitter&quot;, new 
FileSplitterInput());
+    //any concrete implementation of AbstractFSBlockReader based on the 
use-case can be added here.
+    LineReader blockReader = dag.addOperator(&quot;Block-reader&quot;, new 
LineReader());
+    Filter filter = dag.addOperator(&quot;Filter&quot;, new Filter());
+    RecordOutputOperator recordOutputOperator = 
dag.addOperator(&quot;Record-writer&quot;, new RecordOutputOperator());
+
+    dag.addStream(&quot;file-block metadata&quot;, input.blocksMetadataOutput, 
blockReader.blocksMetadataInput);
+    dag.addStream(&quot;records&quot;, blockReader.messages, filter.input);
+    dag.addStream(&quot;filtered-records&quot;, filter.output, 
recordOutputOperator.input);
+  }
+
+  /**
+   * Concrete implementation of {@link AbstractFSBlockReader} for which a 
record is a line in the file.
+   */
+  public static class LineReader extends 
AbstractFSBlockReader.AbstractFSReadAheadLineReader&lt;String&gt;
+  {
+
+    @Override
+    protected String convertToRecord(byte[] bytes)
+    {
+      return new String(bytes);
+    }
+  }
+
+  /**
+   * Considers any line starting with a '.' as invalid. Emits the valid 
records.
+   */
+  public static class Filter extends BaseOperator
+  {
+    public final transient 
DefaultOutputPort&lt;AbstractBlockReader.ReaderRecord&lt;String&gt;&gt; output 
= new DefaultOutputPort&lt;&gt;();
+    public final transient 
DefaultInputPort&lt;AbstractBlockReader.ReaderRecord&lt;String&gt;&gt; input = 
new DefaultInputPort&lt;AbstractBlockReader.ReaderRecord&lt;String&gt;&gt;()
+    {
+      @Override
+      public void process(AbstractBlockReader.ReaderRecord&lt;String&gt; 
stringRecord)
+      {
+        //filter records and transform
+        //if the string starts with a '.' ignore the string.
+        if (!StringUtils.startsWith(stringRecord.getRecord(), &quot;.&quot;)) {
+          output.emit(stringRecord);
+        }
+      }
+    };
+  }
+
+  /**
+   * Persists the valid records to corresponding block files.
+   */
+  public static class RecordOutputOperator extends 
AbstractFileOutputOperator&lt;AbstractBlockReader.ReaderRecord&lt;String&gt;&gt;
+  {
+    @Override
+    protected String 
getFileName(AbstractBlockReader.ReaderRecord&lt;String&gt; tuple)
+    {
+      return Long.toHexString(tuple.getBlockId());
+    }
+
+    @Override
+    protected byte[] 
getBytesForTuple(AbstractBlockReader.ReaderRecord&lt;String&gt; tuple)
+    {
+      return tuple.getRecord().getBytes();
+    }
+  }
+}
+</code></pre>
+
+<p>Configuration to parallel partition block reader with its downstream 
operators.</p>
+<pre><code class="xml">  &lt;property&gt;
+    
&lt;name&gt;dt.operator.Filter.port.input.attr.PARTITION_PARALLEL&lt;/name&gt;
+    &lt;value&gt;true&lt;/value&gt;
+  &lt;/property&gt;
+  &lt;property&gt;
+    
&lt;name&gt;dt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL&lt;/name&gt;
+    &lt;value&gt;true&lt;/value&gt;
+  &lt;/property&gt;
+</code></pre>
+
+<h2 id="abstractfsreadaheadlinereader">AbstractFSReadAheadLineReader</h2>
+<p>This extension of <a 
href="#AbstractFSBlockReader"><code>AbstractFSBlockReader</code></a> parses 
lines from a block and binds the <code>readerContext</code> field to an 
instance of <code>ReaderContext.ReadAheadLineReaderContext</code>.</p>
+<p>It is abstract because it doesn't provide an implementation of <a 
href="#convertToRecord"><code>convertToRecord(byte[] bytes)</code></a> since 
the user may want to convert the bytes that make a line into some other type. 
</p>
+<h3 id="readaheadlinereadercontext">ReadAheadLineReaderContext</h3>
+<p>In order to handle a line split across adjacent blocks, 
ReadAheadLineReaderContext always reads beyond the block boundary and ignores 
the bytes till the first end-of-line character of all the blocks except the 
first block of the file. This ensures that no line is missed or incomplete.</p>
+<p>This is one of the most common ways of handling a split record. It doesn't 
require any further information to decide if a line is complete. However, the 
cost of this consistent way to handle a line split is that it always reads from 
the next block.</p>
+<h2 id="abstractfslinereader">AbstractFSLineReader</h2>
+<p>Similar to <code>AbstractFSReadAheadLineReader</code>, even this parses 
lines from a block. However, it binds the <code>readerContext</code> field to 
an instance of <code>ReaderContext.LineReaderContext</code>.</p>
+<h3 id="linereadercontext">LineReaderContext</h3>
+<p>This handles the line split differently from 
<code>ReadAheadLineReaderContext</code>. It doesn't always read from the next 
block. If the end of the last line is aligned with the block boundary then it 
stops processing the block. It does read from the next block when the 
boundaries are not aligned, that is, last line extends beyond the block 
boundary. The result of this is an inconsistency in reading the next block.</p>
+<p>When the boundary of the last line of the previous block was aligned with 
its block, then the first line of the current block is a valid line. However, 
in the other case the bytes from the block start offset to the first 
end-of-line character should be ignored. Therefore, this means that any record 
formed by this reader context has to be validated. For example, if the lines 
are of fixed size then size of each record can be validated or if each line 
begins with a special field then that knowledge can be used to check if a 
record is complete.</p>
+<p>If the validations of completeness fails for a line then <a 
href="#convertToRecord"><code>convertToRecord(byte[] bytes)</code></a> should 
return null.</p>
+<h2 id="fsslicereader">FSSliceReader</h2>
+<p>A concrete extension of <a 
href="#AbstractFSBlockReader"><code>AbstractFSBlockReader</code></a> that reads 
fixed-size <code>byte[]</code> from a block and emits the byte array wrapped in 
<code>com.datatorrent.netlet.util.Slice</code>.</p>
+<p>This operator binds the <code>readerContext</code> to an instance of 
<code>ReaderContext.FixedBytesReaderContext</code>.</p>
+<h3 id="fixedbytesreadercontext">FixedBytesReaderContext</h3>
+<p>This implementation of <code>ReaderContext</code> never reads beyond a 
block boundary which can result in the last <code>byte[]</code> of a block to 
be of a shorter length than the rest of the records.</p>
+<h3 id="configuration_1">Configuration</h3>
+<p><strong>readerContext.length</strong>: length of each record. By default, 
this is initialized to the default hdfs block size.</p>
+<h2 id="partitioner-and-statslistener">Partitioner and StatsListener</h2>
+<p>The logical instance of the block reader acts as the Partitioner (unless a 
custom partitioner is set using the operator attribute - 
<code>PARTITIONER</code>) as well as a StatsListener. This is because the 
+<code>AbstractBlockReader</code> implements both the 
<code>com.datatorrent.api.Partitioner</code> and 
<code>com.datatorrent.api.StatsListener</code> interfaces and provides an 
implementation of <code>definePartitions(...)</code> and 
<code>processStats(...)</code> which make it auto-scalable.</p>
+<h3 id="processstats">processStats <a name="processStats"></a></h3>
+<p>The application master invokes <code>Response 
processStats(BatchedOperatorStats stats)</code> method on the logical instance 
with the stats (<code>tuplesProcessedPSMA</code>, 
<code>tuplesEmittedPSMA</code>, <code>latencyMA</code>, etc.) of each 
partition. The data which this operator is interested in is the 
<code>queueSize</code> of the input port <code>blocksMetadataInput</code>.</p>
+<p>Usually the <code>queueSize</code> of an input port gives the count of 
waiting control tuples plus data tuples. However, if a stats listener is 
interested only in the count of data tuples then that can be expressed by 
annotating the class with <code>@DataQueueSize</code>. In this case 
<code>AbstractBlockReader</code> itself is the <code>StatsListener</code> which 
is why it is annotated with <code>@DataQueueSize</code>.</p>
+<p>The logical instance caches the queue size per partition and at regular 
intervals (configured by <code>intervalMillis</code>) sums these values to find 
the total backlog which is then used to decide whether re-partitioning is 
needed. The flow-diagram below describes this logic.</p>
+<p><img alt="Processing of total-backlog" 
src="../images/blockreader/totalBacklogProcessing.png" /></p>
+<p>The goal of this logic is to create as many partitions within bounds (see 
<a href="#maxReaders"><code>maxReaders</code></a> and <a 
href="#minReaders"><code>minReaders</code></a> above) to quickly reduce this 
backlog or if the backlog is small then remove any idle partitions.</p>
+<h3 id="definepartitions">definePartitions</h3>
+<p>Based on the <code>repartitionRequired</code> field of the 
<code>Response</code> object which is returned by <em><a 
href="#processStats">processStats</a></em> method, the application master 
invokes </p>
+<pre><code 
class="java">Collection&lt;Partition&lt;AbstractBlockReader&lt;...&gt;&gt;&gt; 
definePartitions(Collection&lt;Partition&lt;AbstractBlockReader&lt;...&gt;&gt;&gt;
 partitions, PartitioningContext context)
+</code></pre>
+
+<p>on the logical instance which is also the partitioner instance. The 
implementation calculates the difference between required partitions and the 
existing count of partitions. If this difference is negative, then equivalent 
number of partitions are removed otherwise new partitions are created. </p>
+<p>Please note auto-scaling can be disabled by setting <a 
href="#collectStats"><code>collectStats</code></a> to <code>false</code>. If 
the use-case requires only static partitioning, then that can be achieved by 
setting <a 
href="https://github.com/chandnisingh/incubator-apex-core/blob/master/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java";><code>StatelessPartitioner</code></a>
 as the operator attribute- <code>PARTITIONER</code> on the block reader.</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer 
navigation">
+      
+        <a href="../fsInputOperator/" class="btn btn-neutral float-right" 
title="File Input">Next <span class="icon icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../file_splitter/" class="btn btn-neutral" title="File 
Splitter"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org";>MkDocs</a> using a <a 
href="https://github.com/snide/sphinx_rtd_theme";>theme</a> provided by <a 
href="https://readthedocs.org";>Read the Docs</a>.
+</footer>
+         
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../file_splitter/" style="color: #fcfcfc;">&laquo; 
Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../fsInputOperator/" 
style="color: #fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/apex-site/blob/d7a21b68/content/docs/malhar-3.6/operators/csvParserOperator/index.html
----------------------------------------------------------------------
diff --git a/content/docs/malhar-3.6/operators/csvParserOperator/index.html 
b/content/docs/malhar-3.6/operators/csvParserOperator/index.html
new file mode 100644
index 0000000..e559bdd
--- /dev/null
+++ b/content/docs/malhar-3.6/operators/csvParserOperator/index.html
@@ -0,0 +1,559 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Csv Parser - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link 
href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700'
 rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Csv Parser";
+    var mkdocs_page_input_path = "operators/csvParserOperator.md";
+    var mkdocs_page_url = "/operators/csvParserOperator/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar 
Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" 
method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" 
aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Csv Parser</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#csv-parser-operator">Csv 
Parser Operator</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#operator-objective">Operator Objective</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#overview">Overview</a></li>
+                
+                    <li><a class="toctree-l4" href="#class-diagram">Class 
Diagram</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#operator-information">Operator Information</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#platform-attributes-that-influences-operator-behavior">Platform 
Attributes that influences operator behavior</a></li>
+                
+                    <li><a class="toctree-l4" href="#ports">Ports</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#partitioning">Partitioning</a></li>
+                
+                    <li><a class="toctree-l4" href="#example">Example</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../enricher/">Enricher</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../deduper/">Deduper</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">Json Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transform Operator</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>Csv Parser</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="csv-parser-operator">Csv Parser Operator</h1>
+<h2 id="operator-objective">Operator Objective</h2>
+<p>This operator is designed to parse delimited records and construct a map or 
concrete java class also known as <a 
href="https://en.wikipedia.org/wiki/Plain_Old_Java_Object";>"POJO"</a> out of 
it. User need to provide the schema to describe the delimited data. Based on 
schema definition the operator will parse the incoming record to object map and 
POJO.  User can also provide constraints if any, in the schema. The supported 
constraints are listed in <a href="#constraints">constraints table</a>. The 
incoming record will be validated against those constraints. Valid records will 
be emitted as POJO / map while invalid ones are emitted on error port with 
error message.</p>
+<p><strong>Note</strong>: field names of POJO must match field names in schema 
and in the same order as it appears in the incoming data.</p>
+<h2 id="overview">Overview</h2>
+<p>The operator is <strong>idempotent</strong>, 
<strong>fault-tolerant</strong> and <strong>partitionable</strong>.</p>
+<h2 id="class-diagram">Class Diagram</h2>
+<p><img alt="" src="../images/csvParser/CSVParser.png" /></p>
+<h2 id="operator-information">Operator Information</h2>
+<ol>
+<li>Operator location:<strong><em>malhar-contrib</em></strong></li>
+<li>Available since:<strong><em>3.2.0</em></strong></li>
+<li>Operator state:<strong><em>Evolving</em></strong></li>
+<li>Java Package:<a 
href="https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java";>com.datatorrent.contrib.parser.CsvParser</a></li>
+</ol>
+<h2 id="properties-of-csv-parser"><a name="props"></a>Properties of Csv 
Parser</h2>
+<p>User need to set the schema which describes delimited data as well as 
specifies constraints on values if any.
+e.g.</p>
+<pre><code class="xml">{
+  &quot;separator&quot;:&quot;,&quot;,
+  &quot;quoteChar&quot;:&quot;\&quot;&quot;,
+  &quot;fields&quot;:[
+    {
+      &quot;name&quot;:&quot;adId&quot;,
+      &quot;type&quot;:&quot;Integer&quot;,
+      &quot;constraints&quot;:{
+         &quot;required&quot;:&quot;true&quot;
+      }
+    },
+    {
+      &quot;name&quot;:&quot;adName&quot;,
+      &quot;type&quot;:&quot;String&quot;,
+      &quot;constraints&quot;:{
+         &quot;required&quot;:&quot;true&quot;,
+         &quot;pattern&quot;:&quot;[a-z].*[a-z]$&quot;,
+         &quot;maxLength&quot;:&quot;10&quot;
+      }
+    },
+    {
+      &quot;name&quot;:&quot;bidPrice&quot;,
+      &quot;type&quot;:&quot;Double&quot;,
+      &quot;constraints&quot;:{
+         &quot;required&quot;:&quot;true&quot;,
+         &quot;minValue&quot;:&quot;0.1&quot;,
+         &quot;maxValue&quot;:&quot;3.2&quot;
+      }
+    },
+    {
+      &quot;name&quot;:&quot;startDate&quot;,
+      &quot;type&quot;:&quot;Date&quot;,
+      &quot;constraints&quot;:{
+         &quot;format&quot;:&quot;yyyy-MM-dd HH:mm:ss&quot;
+      }
+    }
+  ]
+}
+</code></pre>
+
+<table>
+<thead>
+<tr>
+<th><strong>Property</strong></th>
+<th><strong>Description</strong></th>
+<th><strong>Type</strong></th>
+<th><strong>Mandatory</strong></th>
+<th><strong>Default Value</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>schema</em></td>
+<td><a 
href="https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java";>Schema</a>
  describing delimited data</td>
+<td>String</td>
+<td>YES</td>
+<td>N/A</td>
+</tr>
+</tbody>
+</table>
+<h2 id="platform-attributes-that-influences-operator-behavior">Platform 
Attributes that influences operator behavior</h2>
+<table>
+<thead>
+<tr>
+<th><strong>Attribute</strong></th>
+<th><strong>Description</strong></th>
+<th><strong>Type</strong></th>
+<th><strong>Mandatory</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>out.TUPLE_CLASS</em></td>
+<td>TUPLE_CLASS attribute on output port which tells operator the class of 
POJO which need to be emitted</td>
+<td>Class</td>
+<td>Yes</td>
+</tr>
+</tbody>
+</table>
+<h2 id="supported-datatypes-in-schema"><a name="dataTypes"></a>Supported 
DataTypes in Schema</h2>
+<ul>
+<li>Integer</li>
+<li>Long</li>
+<li>Double</li>
+<li>Character</li>
+<li>String</li>
+<li>Boolean</li>
+<li>Date</li>
+<li>Float</li>
+</ul>
+<h2 id="schema-constraints"><a name="constraints"></a>Schema Constraints</h2>
+<table>
+<thead>
+<tr>
+<th><strong>DataType</strong></th>
+<th><strong>Constraints</strong></th>
+<th><strong>Description</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>All data Types</em></td>
+<td>required</td>
+<td>If mentioned, indicates that the data type constraints that follow are 
required. It cannot be blank/null. It may or may not satisfy other constraints 
( like equals/minVal/maxVal etc )</td>
+</tr>
+<tr>
+<td><em>All data Types</em></td>
+<td>equals</td>
+<td>If mentioned, indicates that the data string or value declared in the data 
type constraints must be an exact match with the specified value. <code>Note: 
This constraints is not applicable for data type boolean and date</code></td>
+</tr>
+<tr>
+<td><em>String</em></td>
+<td>Length</td>
+<td>The string must be of the length that is specified.</td>
+</tr>
+<tr>
+<td><em>String</em></td>
+<td>minLength</td>
+<td>The string is at least the length specified as minLength value.</td>
+</tr>
+<tr>
+<td><em>String</em></td>
+<td>maxLength</td>
+<td>The string can be at the most the length specified as maxLength value.</td>
+</tr>
+<tr>
+<td><em>String</em></td>
+<td>pattern</td>
+<td>The string must match the specified regular expression.</td>
+</tr>
+<tr>
+<td><em>Long</em></td>
+<td>maxValue</td>
+<td>The numeric can be at the most the value specified as maxValue.</td>
+</tr>
+<tr>
+<td><em>Long</em></td>
+<td>minValue</td>
+<td>The numeric is at least the value specified as minValue.</td>
+</tr>
+<tr>
+<td><em>Double</em></td>
+<td>maxValue</td>
+<td>The numeric can be at the most the value specified as maxValue.</td>
+</tr>
+<tr>
+<td><em>Double</em></td>
+<td>minValue</td>
+<td>The numeric is at least the value specified as minValue.</td>
+</tr>
+<tr>
+<td><em>Float</em></td>
+<td>maxValue</td>
+<td>The numeric can be at the most the value specified as maxValue.</td>
+</tr>
+<tr>
+<td><em>Float</em></td>
+<td>minValue</td>
+<td>The numeric is at least the value specified as minValue.</td>
+</tr>
+<tr>
+<td><em>Integer</em></td>
+<td>maxValue</td>
+<td>The numeric can be at the most the value specified as maxValue.</td>
+</tr>
+<tr>
+<td><em>Integer</em></td>
+<td>minValue</td>
+<td>The numeric is at least the value specified as minValue.</td>
+</tr>
+<tr>
+<td><em>Date</em></td>
+<td>format</td>
+<td>A simple date format as specified in the SimpleDateFormat class: 
http://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html?is-external=true</td>
+</tr>
+<tr>
+<td><em>Boolean</em></td>
+<td>trueValue</td>
+<td>String for which boolean value is true. The default values are: true, 1, 
y, and t. <code>Note: If you specify trueValue, you must also specify 
falseValue.</code></td>
+</tr>
+<tr>
+<td><em>Boolean</em></td>
+<td>falseValue</td>
+<td>String for which boolean value is false. The default values are: false, 0, 
n, and f. <code>Note: If you specify falseValue, you must also specify 
trueValue.</code></td>
+</tr>
+</tbody>
+</table>
+<h2 id="ports">Ports</h2>
+<table>
+<thead>
+<tr>
+<th><strong>Port</strong></th>
+<th><strong>Description</strong></th>
+<th><strong>Type</strong></th>
+<th><strong>Mandatory</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>in</em></td>
+<td>Tuples that needs to be parsed are recieved on this port</td>
+<td>byte[]</td>
+<td>Yes</td>
+</tr>
+<tr>
+<td><em>out</em></td>
+<td>Valid Tuples that are emitted as pojo</td>
+<td>Object (POJO)</td>
+<td>No</td>
+</tr>
+<tr>
+<td><em>parsedOutput</em></td>
+<td>Valid Tuples that are emitted as map</td>
+<td>Map</td>
+<td>No</td>
+</tr>
+<tr>
+<td><em>err</em></td>
+<td>Invalid Tuples are emitted with error message</td>
+<td>KeyValPair &lt;String, String></td>
+<td>No</td>
+</tr>
+</tbody>
+</table>
+<h2 id="partitioning">Partitioning</h2>
+<p>CSV Parser is both statically and dynamically partitionable.</p>
+<h3 id="static-partitioning">Static Partitioning</h3>
+<p>This can be achieved in 2 ways</p>
+<ol>
+<li>
+<p>Specifying the partitioner and number of partitions in the populateDAG() 
method
+  <code>java
+  CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class);
+  StatelessPartitioner&lt;CsvParser&gt; partitioner1 = new 
StatelessPartitioner&lt;CsvParser&gt;(2);
+  dag.setAttribute(csvParser, Context.OperatorContext.PARTITIONER, 
partitioner1);</code></p>
+</li>
+<li>
+<p>Specifying the partitioner in properties file.
+  <code>xml
+   &lt;property&gt;
+     &lt;name&gt;dt.operator.{OperatorName}.attr.PARTITIONER&lt;/name&gt;
+     
&lt;value&gt;com.datatorrent.common.partitioner.StatelessPartitioner:2&lt;/value&gt;
+   &lt;/property&gt;</code></p>
+</li>
+</ol>
+<p>where {OperatorName} is the name of the CsvParser operator.
+ Above lines will partition CsvParser statically 2 times. Above value can be 
changed accordingly to change the number of static partitions.</p>
+<h3 id="dynamic-paritioning">Dynamic Paritioning</h3>
+<p>CsvParser can be dynamically partitioned using out-of-the-box 
partitioner:</p>
+<h4 id="throughput-based">Throughput based</h4>
+<p>Following code can be added to populateDAG method of application to 
dynamically partition CsvParser:</p>
+<pre><code class="java">CsvParser csvParser = 
dag.addOperator(&quot;csvParser&quot;, CsvParser.class);
+StatelessThroughputBasedPartitioner&lt;CsvParser&gt; partitioner = new 
StatelessThroughputBasedPartitioner&lt;&gt;();
+partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));
+partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
+partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
+dag.setAttribute(csvParser, OperatorContext.STATS_LISTENERS, Arrays.asList(new 
StatsListener[]{partitioner}));
+dag.setAttribute(csvParser, OperatorContext.PARTITIONER, partitioner);
+</code></pre>
+
+<p>Above code will dynamically partition csvParser when the throughput changes.
+If the overall throughput of csvParser goes beyond 30000 or less than 10000, 
the platform will repartition CsvParser
+to balance throughput of a single partition to be between 10000 and 30000.
+CooldownMillis of 10000 will be used as the threshold time for which the 
throughput change is observed.</p>
+<h2 id="example">Example</h2>
+<p>Example for Csv Parser can be found at: <a 
href="https://github.com/DataTorrent/examples/tree/master/tutorials/parser";>https://github.com/DataTorrent/examples/tree/master/tutorials/parser</a></p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer 
navigation">
+      
+        <a href="../file_output/" class="btn btn-neutral float-right" 
title="File Output">Next <span class="icon icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../fsInputOperator/" class="btn btn-neutral" title="File 
Input"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org";>MkDocs</a> using a <a 
href="https://github.com/snide/sphinx_rtd_theme";>theme</a> provided by <a 
href="https://readthedocs.org";>Read the Docs</a>.
+</footer>
+         
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../fsInputOperator/" style="color: #fcfcfc;">&laquo; 
Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../file_output/" 
style="color: #fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/apex-site/blob/d7a21b68/content/docs/malhar-3.6/operators/deduper/index.html
----------------------------------------------------------------------
diff --git a/content/docs/malhar-3.6/operators/deduper/index.html 
b/content/docs/malhar-3.6/operators/deduper/index.html
new file mode 100644
index 0000000..ef44d07
--- /dev/null
+++ b/content/docs/malhar-3.6/operators/deduper/index.html
@@ -0,0 +1,784 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Deduper - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link 
href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700'
 rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Deduper";
+    var mkdocs_page_input_path = "operators/deduper.md";
+    var mkdocs_page_url = "/operators/deduper/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar 
Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" 
method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" 
aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvParserOperator/">Csv Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../enricher/">Enricher</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Deduper</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a 
href="#deduper-operator-documentation">Deduper - Operator Documentation</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#introduction">Introduction</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#about-this-document">About this document</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#terminology">Terminology</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#overview">Overview</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#dedup-what-in-a-nutshell">Dedup - “What” in a Nutshell</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#dedup-how-in-a-nutshell">Dedup - “How” in a Nutshell</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#use-cases-basic-dedup">Use 
cases - Basic Dedup</a></li>
+                
+                    <li><a class="toctree-l4" href="#dedup-key">Dedup 
Key</a></li>
+                
+                    <li><a class="toctree-l4" href="#use-case-details">Use 
case Details</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#use-case-dedup-with-expiry">Use case - Dedup with Expiry</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#motivation">Motivation</a></li>
+                
+                    <li><a class="toctree-l4" href="#expiry-key">Expiry 
Key</a></li>
+                
+                    <li><a class="toctree-l4" href="#expiry-period">Expiry 
Period</a></li>
+                
+                    <li><a class="toctree-l4" href="#use-case-details_1">Use 
case Details</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#use-cases-summary">Use cases 
- Summary</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#technical-architecture">Technical Architecture</a></li>
+                
+                    <li><a class="toctree-l4" href="#class-structure">Class 
Structure</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#architectural-details">Architectural Details</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#concepts">Concepts</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#assumptions">Assumptions</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#flow-of-a-tuple-through-dedup-operator">Flow of a Tuple through Dedup 
Operator</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#ports-attributes-and-properties">Ports, Attributes and 
Properties</a></li>
+                
+                    <li><a class="toctree-l4" href="#ports">Ports</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#attributes">Attributes</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#properties">Properties</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#example">Example</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#partitioning">Partitioning</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">Json Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transform Operator</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>Deduper</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="deduper-operator-documentation">Deduper - Operator 
Documentation</h1>
+<h1 id="introduction">Introduction</h1>
+<h2 id="about-this-document">About this document</h2>
+<p>This document is intended as a guide for understanding and using
+the Dedup operator.</p>
+<h2 id="terminology">Terminology</h2>
+<p>We will refer to this operator as the Deduper or Dedup operator
+interchangeably.</p>
+<h1 id="overview">Overview</h1>
+<h2 id="dedup-what-in-a-nutshell">Dedup - “What” in a Nutshell</h2>
+<p>Dedup is actually short for Deduplication. Duplicates are omnipresent and
+can be found in almost any kind of data. Most of the times it is
+essential to discard, or at the very least separate out the data into
+unique and duplicate components. The entire purpose of this
+operator is to de-duplicate data. In other words, when data passes
+through this operator, it will be segregated into two different data
+sets, one of which contains all unique tuples, and the other which are
+occurring more than once in the original data set.</p>
+<p><img alt="" src="../images/deduper/image00.png" /></p>
+<h2 id="dedup-how-in-a-nutshell">Dedup - “How” in a Nutshell</h2>
+<p>In order to quickly decide whether an incoming tuple is duplicate
+or unique, it has to store each incoming tuple (or a signature, like key,
+for example) to be used for comparison later. A plain in-memory storage
+may work for small datasets, but will not scale for large ones. Deduper 
employs a large scale distributed persistent hashing mechanism (known as the 
Managed State) which allows
+it to identify if a particular tuple is duplicate or unique. Managed state is 
a layer on HDFS which allows all the stored data to be persisted in a 
distributed fashion.
+Each time it identifies a tuple as a unique tuple, it also
+stores it into the Managed state for future
+lookup.</p>
+<h2 id="_1"><img alt="" src="../images/deduper/image04.png" /></h2>
+<p>Following are the different components of the Deduper</p>
+<ol>
+<li><strong>Dedup Operator</strong> - This is responsible for the overall
+    functionality of the operator. This in turn makes use of other
+    components to establish the end goal of deciding whether a tuple is
+    a duplicate of some earlier tuple, or is a unique tuple.</li>
+<li><strong>Managed State</strong> - Since, all of the data cannot be stored in
+    memory, this component allows us to persist existing unique keys on
+    HDFS in form of buckets. This is also responsible for fetching data as
+    requested by the Deduper. Since, it communicates with the HDFS, data 
access is slow and so it allows for asynchronous (non-blocking) calls to fetch 
data. This ensures that the Deduper is not blocked and can continue to process 
other tuples. It also supports an in-memory cache where it stores the fetched 
data so that repeated access to the same data is faster. Periodically, based on 
configuration, this also
+    discards data which is no longer needed.</li>
+</ol>
+<p>This was a very basic introduction to the functioning of the
+Deduper. Following sections will go into more detail on each of the
+components.</p>
+<h1 id="use-cases-basic-dedup">Use cases - Basic Dedup</h1>
+<h2 id="dedup-key">Dedup Key</h2>
+<p>A dedup key is a set of one or more fields in the data tuple which
+acts as the key for the tuples.
+This is used by the deduper to compare tuples to arrive at the
+conclusion on whether two tuples are duplicates.</p>
+<p>Consider an example schema and two sample tuples</p>
+<p><code>{Name, Phone, Email, Date, State, Zip, Country}</code></p>
+<p>Tuple 1:</p>
+<pre><code>{
+  Austin U. Saunders,
+  +91-319-340-59385,
+  [email protected],
+  2015-11-09 13:38:38,
+  Texas,
+  73301,
+  United States
+}
+</code></pre>
+
+<p>Tuple 2:</p>
+<pre><code>{
+  Austin U. Saunders,
+  +91-319-340-59385,
+  [email protected],
+  2015-11-09 13:39:38,
+  Texas,
+  73301,
+  United States
+}
+</code></pre>
+
+<p>Let us assume that the Dedup Key
+is <code>{Name, Phone}</code>. In
+this case, the two tuples are duplicates because the key fields are same
+in both the tuples. However, if the Dedup Key is {Phone,Email},
+the two are unique as the email values differ.</p>
+<h2 id="use-case-details">Use case Details</h2>
+<p>Consider the case of de-duplicating a master data set
+which is stored in a file. Further also consider the
+following schema for tuples in the data set.</p>
+<p><code>{Name, Phone, Email, Date, City, Zip, Country}</code></p>
+<p>Also consider that we need to identify unique customers from the
+master data set. So, ultimately the output needed for the use case is
+two data sets - Unique Records and Duplicate Records.</p>
+<p>As part of configuring the operator for this use case, we need to
+set the following parameters:</p>
+<ul>
+<li><strong><em>keyExpression</em></strong> - This can be set as
+    the primary key which can be used to uniquely identify a Customer.
+    For example, we can set it to <code>Name,Email</code></li>
+</ul>
+<p>The above configuration is sufficient to address this use case.</p>
+<h1 id="use-case-dedup-with-expiry">Use case - Dedup with Expiry</h1>
+<h2 id="motivation">Motivation</h2>
+<p>The Basic Dedup use case is the most straightforward and is
+usually applied when the amount of data to be processed is not huge.
+However, if the incoming data is huge, or even never-ending, it is
+usually not necessary to keep storing all the data. This is because in
+most real world use cases, the duplicates occur only a short distance
+apart. Hence, after a while, it is usually okay to forget part of
+the history and consider only limited history for identifying
+duplicates, in the interest of efficiency. In other words, we expire
+(ignore) some tuples which are (or were supposed to be) delivered long
+back. Doing so, reduces the load on the storage mechanism (managed state) 
which effectively deletes part of the history, thus making the whole process 
more
+efficient. We call this use case, Dedup with expiry.</p>
+<h2 id="expiry-key">Expiry Key</h2>
+<p>The easiest way to understand this use case is to consider
+time as the criterion for expiring
+tuples. Time is a natural expiry
+key and is in line with the concept of expiry. Formally, an expiry field
+is a field in the input tuple which can be used to discard incoming
+tuples as expired. This expiry key
+usually works with another parameter called Expiry Period defined
+next.</p>
+<h2 id="expiry-period">Expiry Period</h2>
+<p>The expiry period is the value supplied by the user to define the
+extent of history which should be considered while expiring
+tuples.</p>
+<h2 id="use-case-details_1">Use case Details</h2>
+<p>Consider an incoming stream of system logs. The use case requires
+us to identify duplicate log messages and pass on only the unique ones.
+Another relaxation in the use case is that the log messages which are
+older than a day, may not be considered and must be filtered out as
+expired. The expiry must be measured with respect to the time stamp in
+the logs. For example, if the timestamp in the incoming message is
+<code>30-12-2014 00:00:00</code> and the
+latest message that the system has encountered had the time stamp
+<code>31-12-2014 00:00:00</code>, then the
+incoming message must be considered as expired. However, if the incoming
+message had any timestamp like <code>30-12-2014
+00:11:00</code>, it must be accepted into the system and be checked for a 
possible duplicate.</p>
+<p>The expiry facet in the use case above gives us an advantage in
+that we do not have to compare the incoming record with all the data to check 
if it is a duplicate.
+At the same time, all the
+incoming data need not be stored; just a day worth of data is adequate to 
address the above use case.</p>
+<p>Configuring the below parameters will solve the problem for this
+use case:</p>
+<ul>
+<li><strong><em>keyExpression</em></strong> - This is the dedup key for the 
incoming tuples (similar to the Basic Dedup use case). This can be any key 
which can uniquely identify a record. For log messages this can be a serial 
number attached in the log.</li>
+<li><strong><em>timeExpression</em></strong> - This is the key which can help 
identify the expired records, as explained above. In this particular use case, 
it can be a timestamp field which indicates when the log message was 
generated.</li>
+<li><strong><em>expireBefore</em></strong> - This is the period of expiry as 
explained above. In our example use case this will be 24 hour, specified in 
seconds.</li>
+</ul>
+<p>Configuration of the above parameters is sufficient to address this use
+case.</p>
+<h1 id="use-cases-summary">Use cases - Summary</h1>
+<ol>
+<li><strong>Basic Dedup</strong> - Deduplication of
+    bounded datasets. Data is assumed to be bounded. This use case is
+    not meant for never ending streams of data. For example:
+    Deduplication of master data like customer records, product catalogs
+    etc.</li>
+<li><strong>Time Based Dedup</strong> - Deduplication of
+    unlimited streams of data. This use case handles unbounded streams
+    of data and can run forever. An expiry key and criterion is expected
+    as part of the input which helps avoid storing all the unique data.
+    This helps speed up performance. Any timestamp field in the incoming
+    tuple can be used as a time based expiry key.<ul>
+<li><em>With respect to system time</em> - Time progresses with system time. 
Any expiry criteria are executed with the notion of system time. This is 
possible if the incoming tuple does not have a time field, or the user does not 
specify a <code>timeExpression</code>.</li>
+<li><em>With respect to tuple time</em> - Time progresses based on the time 
in the incoming tuples. Expiry criteria are executed with the notion of time 
indicated by the incoming tuple. Specification of the time field 
(<code>timeExpression</code>) is mandatory for this scenario.</li>
+</ul>
+</li>
+</ol>
+<h1 id="technical-architecture">Technical Architecture</h1>
+<h2 id="class-structure">Class Structure</h2>
+<p><img alt="" src="../images/deduper/image03.png" /></p>
+<hr />
+<h2 id="architectural-details">Architectural Details</h2>
+<p><img alt="" src="../images/deduper/image02.png" /></p>
+<h2 id="concepts">Concepts</h2>
+<h3 id="dedup-key-specified-by-keyexpression-parameter">Dedup Key - Specified 
by <em>keyExpression</em> parameter</h3>
+<p>A dedup key is a set of one or more fields in the data tuple which
+acts as the key for the tuples.
+This is used by the deduper to compare tuples to arrive at the
+conclusion on whether two tuples are duplicates. If Dedup Key of two
+tuples match, then they are duplicates, else they are unique.</p>
+<h3 id="expiry-key-specified-by-timeexpression-parameter">Expiry Key - 
Specified by <em>timeExpression</em> parameter</h3>
+<p>A tuple may or may not have an Expiry Key. Dedup operator cannot
+keep storing all the data that is flowing into the operator. At some
+point it becomes essential to discard some of the historical tuples in
+interest of memory and efficiency.</p>
+<p>At the same time, tuples are expected to arrive at the Dedup
+operator within some time after they are generated. After this time, the
+tuples may be considered as stale or obsolete.</p>
+<p>In such cases, the Deduper considers these tuples as
+expired and takes no action other than
+separating out these tuples on a different port in order to be processed
+by some other operator or stored offline for analysis.</p>
+<p>In order to create a criterion for discarding such tuples, we
+introduce an Expiry Key. Looking at the value of the Expiry Key in each
+tuple, we can decide whether or not to discard this tuple as
+expired.</p>
+<p>The expiry key that we consider in Time Based Dedup is
+time. This usually works with
+another parameter called Expiry Period defined next.</p>
+<h3 id="expiry-period_1">Expiry Period</h3>
+<p>The Expiry Period is the value supplied by the user which decides
+when a particular tuple expires.</p>
+<h3 id="time-points">Time Points</h3>
+<p>For every dataset that the deduper processes, a set of time points is 
maintained:</p>
+<ol>
+<li><em>Latest Point</em> - This is the maximum
+    time point observed in all the processed tuples.</li>
+<li><em>Expiry Point</em> - This is given by:
+    <code>Expiry Point = Latest Point - Expiry Period</code></li>
+</ol>
+<p>These points help the deduper to make decisions related to expiry
+of a tuple.</p>
+<h3 id="example-expiry">Example - Expiry</h3>
+<table>
+<thead>
+<tr>
+<th align="center">Tuple Id</th>
+<th align="center">Expiry Key (Expiry Period = 10)</th>
+<th align="center">Latest Point</th>
+<th align="center">Expiry Point</th>
+<th align="center">Decision for Tuple</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td align="center">1</td>
+<td align="center">10</td>
+<td align="center">10</td>
+<td align="center">1</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">2</td>
+<td align="center">20</td>
+<td align="center">20</td>
+<td align="center">11</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">3</td>
+<td align="center">25</td>
+<td align="center">25</td>
+<td align="center">16</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">4</td>
+<td align="center">40</td>
+<td align="center">40</td>
+<td align="center">31</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">5</td>
+<td align="center">21</td>
+<td align="center">40</td>
+<td align="center">31</td>
+<td align="center">Expired</td>
+</tr>
+<tr>
+<td align="center">6</td>
+<td align="center">35</td>
+<td align="center">40</td>
+<td align="center">31</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">7</td>
+<td align="center">45</td>
+<td align="center">45</td>
+<td align="center">36</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">8</td>
+<td align="center">57</td>
+<td align="center">57</td>
+<td align="center">48</td>
+<td align="center">Not Expired</td>
+</tr>
+</tbody>
+</table>
+<h3 id="time-buckets-a-component-of-managed-state">Time Buckets (A component 
of Managed State)</h3>
+<p>One of the requirements of the Deduper is to store all the unique
+tuples (actually, just the keys of tuples). Keeping an ever growing
+cache in memory is not scalable. So what we need is a limited cache
+backed by a persistent store. When data is requested to be fetched from managed
+state, it is also cached in an in-memory cache. Buckets help
+narrow down the search of duplicates for incoming tuples. A Bucket is an
+abstraction for a collection of tuples all of which share a common hash
+value based on some hash function or a range of time, for example: a
+bucket of data for 5 contiguous minutes. A Bucket has a span property called 
Bucket Span.</p>
+<h3 id="bucket-span">Bucket Span</h3>
+<p>Bucket span is simply the range of the domain
+that is covered by the Bucket. This span is specified in
+the domain of the Expiry key. If the Expiry
+Key is time,  then the Bucket span
+will be specified in seconds. It is
+only defined in case tuples have an Expiry Key.</p>
+<h3 id="number-of-buckets">Number of Buckets</h3>
+<p>The number of buckets can be given by - <code>Num Buckets = Expiry
+Period / Bucket Span</code></p>
+<p>This is because at any point of time, we need only store Expiry
+Period worth of data.</p>
+<h3 id="example-buckets">Example - Buckets</h3>
+<p><img alt="" src="../images/deduper/image01.png" /></p>
+<h2 id="assumptions">Assumptions</h2>
+<h3 id="assumption-1">Assumption 1 <a name="Assumption1"></a></h3>
+<p>This assumption is only applicable in case of Dedup with
+Expiry.</p>
+<p>For any two tuples, t1 and t2 having dedup keys d1 and d2, and
+expiry keys e1 and e2, respectively, the following holds:</p>
+<pre><code>If d1 = d2,
+  then e1 = e2
+</code></pre>
+
+<p>In other words, there may never
+be two tuples t1 and t2 such that:</p>
+<pre><code>Tuple 1: d1, e1
+Tuple 2: d2, e2
+d1 = d2 and e1 != e2
+</code></pre>
+
+<p>In other words, any two tuples with the same dedup key are assumed to have 
the
+same expiry key as well.
+This assumption was made with respect to certain use cases. These
+use cases follow this assumption in that the records which are
+duplicates are exactly identical. An example use case is when log
+messages are replayed erroneously, and we want to identify the duplicate
+log messages. In such cases, we need not worry about two different log
+messages having the same identifier but different timestamps. Since its
+a replay of the same data, the duplicate records are assumed to be
+exactly identical.</p>
+<p>In case the duplicate tuple has a different value for expiry key, the 
behavior of
+the deduper can be non-deterministic.</p>
+<h2 id="flow-of-a-tuple-through-dedup-operator">Flow of a Tuple through Dedup 
Operator</h2>
+<p>Tuples flow through the Dedup operator one by one. Deduper may process a 
tuple immediately, or store it in some data
+structure for later processing.</p>
+<p>When a tuple always arrives at the input
+port of the Dedup operator, it does
+the following tasks.</p>
+<h4 id="check-if-tuple-is-expired">Check if tuple is Expired</h4>
+<p>This is only done in case of Dedup with expiry. The
+following condition is used to check if the tuple is expired.</p>
+<pre><code>if ( Latest Point - Expiry Key &lt; Expiry Point )
+  then Expired
+</code></pre>
+
+<p>If the tuple is expired, then send it to the expired port.</p>
+<h4 id="check-if-tuple-is-a-duplicate-or-unique">Check if tuple is a Duplicate 
or Unique</h4>
+<p>Once a tuple passes the check of expiry, we proceed to check if
+the tuple is a duplicate of some earlier tuple. Note that
+if the tuple in question is not expired, the duplicate will also not
+have expired due to the assumption listed <a href="#Assumption1">here</a>.
+The Deduper queries the Managed state to fetch the value for the tuple key.
+This request is processed by the Managed state in a separate asynchronous 
thread.
+Once this request is submitted, the Deduper moves on to process other
+tuples. Additionally the Deduper also inserts the tuple being processed
+into a waiting events queue for later processing.</p>
+<h4 id="process-pending-tuples">Process pending tuples</h4>
+<p>Once the Deduper has looked at the all the tuples in the current window,
+it starts to process the tuples in the waiting queue to finalize the decision
+(unique or duplicate) for these tuples.
+Once the request to Managed state is completed for a tuple and the value is
+fetched from persistent storage, the Deduper can decide if the tuple in
+question is a duplicate or a unique.
+Depending on whether there is enough time left in the current window,
+it can do one of the following:</p>
+<ul>
+<li>Process only the tuples for which the managed state has completed 
processing.
+The tuples which are still being processed by managed state are skipped only 
to come back to them when it can no longer postpone it. This is typically done 
when the operator
+has idle time as there are no tuples on the input ports and the current window
+has still not ended.</li>
+<li>Block on them to complete their processing. This will happen when the 
current
+window has no time left, and the decision cannot be postponed. Note: An 
operator can end its window, only when all the tuples have been completely 
processed.  </li>
+</ul>
+<h1 id="ports-attributes-and-properties">Ports, Attributes and Properties</h1>
+<h2 id="ports">Ports</h2>
+<p>The deduper has a single input port and multiple output
+ports.</p>
+<ul>
+<li><strong><em>input</em></strong> - This is the input port through
+    which the tuples arrive at the Deduper.</li>
+<li><strong><em>unique</em></strong> - This is the output port on
+    which unique tuples are sent out by the Deduper.</li>
+<li><strong><em>duplicate</em></strong> - This is the output port on
+    which duplicate tuples are sent out by the Deduper.</li>
+<li><strong><em>expired</em></strong> - This is the output port on
+    which expired tuples are sent out by the Deduper.</li>
+</ul>
+<p>The user can choose which output ports to connect the down stream operators.
+All the output ports are optional and can be used as required by the use 
case.</p>
+<h2 id="attributes">Attributes</h2>
+<ul>
+<li><strong><em>Input port Attribute - input.TUPLE_CLASS</em></strong> - 
Class or the fully
+qualified class name.<ul>
+<li>Mandatory attribute</li>
+<li>Tells the operator about the type of the incoming
+tuple.</li>
+</ul>
+</li>
+</ul>
+<h2 id="properties">Properties</h2>
+<ul>
+<li>
+<p><strong><em>keyExpression</em></strong> - String</p>
+<ul>
+<li>Mandatory parameter.</li>
+<li>The java expression to extract the key fields in the incoming tuple 
(POJO)</li>
+</ul>
+</li>
+<li>
+<p><strong><em>timeExpression</em></strong> - String - (Time Based Deduper 
only)</p>
+<ul>
+<li>The java expression to extract the time field in the incoming tuple 
(POJO).</li>
+</ul>
+</li>
+<li>
+<p><strong><em>expireBefore</em></strong> - Long (Seconds) - (Time Based 
Deduper only)</p>
+<ul>
+<li>This is the total time period during which a tuple stays in the system and 
blocks any other tuple with the same key.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>bucketSpan</em></strong> - Long (Seconds) - (Time Based 
Deduper only)</p>
+<ul>
+<li>Mandatory parameter</li>
+<li>This is the unit which describes how large a bucket can be. Typically this 
should be defined depending on the use case. For example, if we have 
expireBefore set to 1 hour, then typically we would be clubbing data in the 
order of minutes, so a <code>bucketSpan</code> of a few minutes would make 
sense. Note that in this case, the entire data worth the 
<code>bucketSpan</code> will expire as a whole. Setting it to 1 minute would 
make the number of time buckets in the system to be 1 hour / 1 minute = 60 
buckets.  Similarly setting bucketSpan to 5 minutes would make number of 
buckets to be 12.</li>
+<li>Note that having too many or too few buckets could have a performance 
impact. If unsure, set the bucketSpan to the square root of 
<code>expireBefore</code>. This way the number of buckets and bucket span are 
balanced.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>referenceInstant</em></strong> -  Long (Seconds) - (Time 
Based Deduper only)</p>
+<ul>
+<li>The reference point from which to start the time which is use for expiry. 
Setting the referenceInstant to say, r seconds from the epoch, would initialize 
the start of expiry to be from that <code>instant = r</code>. The start and end 
of the expiry window periodically move by the span of a single bucket.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>numBuckets</em></strong> -  Integer - (Bounded Deduper 
only)</p>
+<ul>
+<li>Optional parameter, but recommended to be provided by the user.</li>
+<li>This is the number of buckets that need to be used for storing the keys of 
the incoming tuples.</li>
+<li>Users can decide upon the proper value for this parameter by guessing the 
number of distinct keys in the application. A reasonable value is the square 
root of N, where N is the number of distinct keys. If omitted, the Java 
MAX_VALUE for integer is used for N.</li>
+</ul>
+</li>
+</ul>
+<h1 id="example">Example</h1>
+<p>Please refer to <a 
href="https://github.com/DataTorrent/examples/tree/master/tutorials/dedup";>https://github.com/DataTorrent/examples/tree/master/tutorials/dedup</a>Â
 for
+an example on how to use Deduper.</p>
+<h1 id="partitioning">Partitioning</h1>
+<p>Deduper can be statically partitioned using the operator
+attribute: PARTITIONER</p>
+<p>Add the following property to the properties.xml file:</p>
+<pre><code>&lt;property&gt;
+        
&lt;name&gt;dt.operator.{OperatorName}.attr.PARTITIONER&lt;/name&gt;
+        
&lt;value&gt;com.datatorrent.common.partitioner.StatelessPartitioner:2&lt;/value&gt;
+&lt;/property&gt;
+</code></pre>
+
+<p>This will partition the Dedup operator into 2 static partitions. Change the 
number
+to the required number of partitions.</p>
+<p>Dynamic partitioning is currently not supported in the Deduper.</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer 
navigation">
+      
+        <a href="../windowedOperator/" class="btn btn-neutral float-right" 
title="Windowed Operator">Next <span class="icon 
icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../filter/" class="btn btn-neutral" title="Filter"><span 
class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org";>MkDocs</a> using a <a 
href="https://github.com/snide/sphinx_rtd_theme";>theme</a> provided by <a 
href="https://readthedocs.org";>Read the Docs</a>.
+</footer>
+         
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../filter/" style="color: #fcfcfc;">&laquo; 
Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../windowedOperator/" 
style="color: #fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

Reply via email to