http://git-wip-us.apache.org/repos/asf/apex-site/blob/afbb4705/content/docs/malhar-3.7/operators/deduper/index.html
----------------------------------------------------------------------
diff --git a/content/docs/malhar-3.7/operators/deduper/index.html 
b/content/docs/malhar-3.7/operators/deduper/index.html
new file mode 100644
index 0000000..48cf370
--- /dev/null
+++ b/content/docs/malhar-3.7/operators/deduper/index.html
@@ -0,0 +1,862 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Deduper - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link 
href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700'
 rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Deduper";
+    var mkdocs_page_input_path = "operators/deduper.md";
+    var mkdocs_page_url = "/operators/deduper/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar 
Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" 
method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" 
aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>APIs</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../../apis/calcite/">SQL</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">CSV Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvParserOperator/">CSV Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Deduper</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a 
href="#deduper-operator-documentation">Deduper - Operator Documentation</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#introduction">Introduction</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#about-this-document">About this document</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#terminology">Terminology</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#overview">Overview</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#dedup-what-in-a-nutshell">Dedup - “What” in a Nutshell</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#dedup-how-in-a-nutshell">Dedup - “How” in a Nutshell</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#use-cases-basic-dedup">Use 
cases - Basic Dedup</a></li>
+                
+                    <li><a class="toctree-l4" href="#dedup-key">Dedup 
Key</a></li>
+                
+                    <li><a class="toctree-l4" href="#use-case-details">Use 
case Details</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#use-case-dedup-with-expiry">Use case - Dedup with Expiry</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#motivation">Motivation</a></li>
+                
+                    <li><a class="toctree-l4" href="#expiry-key">Expiry 
Key</a></li>
+                
+                    <li><a class="toctree-l4" href="#expiry-period">Expiry 
Period</a></li>
+                
+                    <li><a class="toctree-l4" href="#use-case-details_1">Use 
case Details</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#use-cases-summary">Use cases 
- Summary</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#technical-architecture">Technical Architecture</a></li>
+                
+                    <li><a class="toctree-l4" href="#class-structure">Class 
Structure</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#architectural-details">Architectural Details</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#concepts">Concepts</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#assumptions">Assumptions</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#flow-of-a-tuple-through-dedup-operator">Flow of a Tuple through Dedup 
Operator</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#ports-attributes-and-properties">Ports, Attributes and 
Properties</a></li>
+                
+                    <li><a class="toctree-l4" href="#ports">Ports</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#attributes">Attributes</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#properties">Properties</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#example">Example</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#partitioning">Partitioning</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../enricher/">Enricher</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../AbstractJdbcTransactionableOutputOperator/">Jdbc 
Output Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jdbcPollInputOperator/">JDBC Poller Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">JSON Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">JSON Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transformer</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../xmlParserOperator/">XML Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">Json Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transform Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">Csv Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../s3outputmodule/">S3 Output Module</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>Deduper</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="deduper-operator-documentation">Deduper - Operator 
Documentation</h1>
+<h1 id="introduction">Introduction</h1>
+<h2 id="about-this-document">About this document</h2>
+<p>This document is intended as a guide for understanding and using
+the Dedup operator.</p>
+<h2 id="terminology">Terminology</h2>
+<p>We will refer to this operator as the Deduper or Dedup operator
+interchangeably.</p>
+<h1 id="overview">Overview</h1>
+<h2 id="dedup-what-in-a-nutshell">Dedup - “What” in a Nutshell</h2>
+<p>Dedup is actually short for Deduplication. Duplicates are omnipresent and
+can be found in almost any kind of data. Most of the times it is
+essential to discard, or at the very least separate out the data into
+unique and duplicate components. The entire purpose of this
+operator is to de-duplicate data. In other words, when data passes
+through this operator, it will be segregated into two different data
+sets, one of which contains all unique tuples, and the other which are
+occurring more than once in the original data set.</p>
+<p><img alt="" src="../images/deduper/image00.png" /></p>
+<h2 id="dedup-how-in-a-nutshell">Dedup - “How” in a Nutshell</h2>
+<p>In order to quickly decide whether an incoming tuple is duplicate
+or unique, it has to store each incoming tuple (or a signature, like key,
+for example) to be used for comparison later. A plain in-memory storage
+may work for small datasets, but will not scale for large ones. Deduper 
employs a large scale distributed persistent hashing mechanism (known as the 
Managed State) which allows
+it to identify if a particular tuple is duplicate or unique. Managed state is 
a layer on HDFS which allows all the stored data to be persisted in a 
distributed fashion.
+Each time it identifies a tuple as a unique tuple, it also
+stores it into the Managed state for future
+lookup.</p>
+<h2 id="_1"><img alt="" src="../images/deduper/image04.png" /></h2>
+<p>Following are the different components of the Deduper</p>
+<ol>
+<li><strong>Dedup Operator</strong> - This is responsible for the overall
+    functionality of the operator. This in turn makes use of other
+    components to establish the end goal of deciding whether a tuple is
+    a duplicate of some earlier tuple, or is a unique tuple.</li>
+<li><strong>Managed State</strong> - Since, all of the data cannot be stored in
+    memory, this component allows us to persist existing unique keys on
+    HDFS in form of buckets. This is also responsible for fetching data as
+    requested by the Deduper. Since, it communicates with the HDFS, data 
access is slow and so it allows for asynchronous (non-blocking) calls to fetch 
data. This ensures that the Deduper is not blocked and can continue to process 
other tuples. It also supports an in-memory cache where it stores the fetched 
data so that repeated access to the same data is faster. Periodically, based on 
configuration, this also
+    discards data which is no longer needed.</li>
+</ol>
+<p>This was a very basic introduction to the functioning of the
+Deduper. Following sections will go into more detail on each of the
+components.</p>
+<h1 id="use-cases-basic-dedup">Use cases - Basic Dedup</h1>
+<h2 id="dedup-key">Dedup Key</h2>
+<p>A dedup key is a set of one or more fields in the data tuple which
+acts as the key for the tuples.
+This is used by the deduper to compare tuples to arrive at the
+conclusion on whether two tuples are duplicates.</p>
+<p>Consider an example schema and two sample tuples</p>
+<p><code>{Name, Phone, Email, Date, State, Zip, Country}</code></p>
+<p>Tuple 1:</p>
+<pre><code>{
+  Austin U. Saunders,
+  +91-319-340-59385,
+  [email protected],
+  2015-11-09 13:38:38,
+  Texas,
+  73301,
+  United States
+}
+</code></pre>
+
+<p>Tuple 2:</p>
+<pre><code>{
+  Austin U. Saunders,
+  +91-319-340-59385,
+  [email protected],
+  2015-11-09 13:39:38,
+  Texas,
+  73301,
+  United States
+}
+</code></pre>
+
+<p>Let us assume that the Dedup Key
+is <code>{Name, Phone}</code>. In
+this case, the two tuples are duplicates because the key fields are same
+in both the tuples. However, if the Dedup Key is {Phone,Email},
+the two are unique as the email values differ.</p>
+<h2 id="use-case-details">Use case Details</h2>
+<p>Consider the case of de-duplicating a master data set
+which is stored in a file. Further also consider the
+following schema for tuples in the data set.</p>
+<p><code>{Name, Phone, Email, Date, City, Zip, Country}</code></p>
+<p>Also consider that we need to identify unique customers from the
+master data set. So, ultimately the output needed for the use case is
+two data sets - Unique Records and Duplicate Records.</p>
+<p>As part of configuring the operator for this use case, we need to
+set the following parameters:</p>
+<ul>
+<li><strong><em>keyExpression</em></strong> - This can be set as
+    the primary key which can be used to uniquely identify a Customer.
+    For example, we can set it to <code>Name,Email</code></li>
+</ul>
+<p>The above configuration is sufficient to address this use case.</p>
+<h1 id="use-case-dedup-with-expiry">Use case - Dedup with Expiry</h1>
+<h2 id="motivation">Motivation</h2>
+<p>The Basic Dedup use case is the most straightforward and is
+usually applied when the amount of data to be processed is not huge.
+However, if the incoming data is huge, or even never-ending, it is
+usually not necessary to keep storing all the data. This is because in
+most real world use cases, the duplicates occur only a short distance
+apart. Hence, after a while, it is usually okay to forget part of
+the history and consider only limited history for identifying
+duplicates, in the interest of efficiency. In other words, we expire
+(ignore) some tuples which are (or were supposed to be) delivered long
+back. Doing so, reduces the load on the storage mechanism (managed state) 
which effectively deletes part of the history, thus making the whole process 
more
+efficient. We call this use case, Dedup with expiry.</p>
+<h2 id="expiry-key">Expiry Key</h2>
+<p>The easiest way to understand this use case is to consider
+time as the criterion for expiring
+tuples. Time is a natural expiry
+key and is in line with the concept of expiry. Formally, an expiry field
+is a field in the input tuple which can be used to discard incoming
+tuples as expired. This expiry key
+usually works with another parameter called Expiry Period defined
+next.</p>
+<h2 id="expiry-period">Expiry Period</h2>
+<p>The expiry period is the value supplied by the user to define the
+extent of history which should be considered while expiring
+tuples.</p>
+<h2 id="use-case-details_1">Use case Details</h2>
+<p>Consider an incoming stream of system logs. The use case requires
+us to identify duplicate log messages and pass on only the unique ones.
+Another relaxation in the use case is that the log messages which are
+older than a day, may not be considered and must be filtered out as
+expired. The expiry must be measured with respect to the time stamp in
+the logs. For example, if the timestamp in the incoming message is
+<code>30-12-2014 00:00:00</code> and the
+latest message that the system has encountered had the time stamp
+<code>31-12-2014 00:00:00</code>, then the
+incoming message must be considered as expired. However, if the incoming
+message had any timestamp like <code>30-12-2014
+00:11:00</code>, it must be accepted into the system and be checked for a 
possible duplicate.</p>
+<p>The expiry facet in the use case above gives us an advantage in
+that we do not have to compare the incoming record with all the data to check 
if it is a duplicate.
+At the same time, all the
+incoming data need not be stored; just a day worth of data is adequate to 
address the above use case.</p>
+<p>Configuring the below parameters will solve the problem for this
+use case:</p>
+<ul>
+<li><strong><em>keyExpression</em></strong> - This is the dedup key for the 
incoming tuples (similar to the Basic Dedup use case). This can be any key 
which can uniquely identify a record. For log messages this can be a serial 
number attached in the log.</li>
+<li><strong><em>timeExpression</em></strong> - This is the key which can help 
identify the expired records, as explained above. In this particular use case, 
it can be a timestamp field which indicates when the log message was 
generated.</li>
+<li><strong><em>expireBefore</em></strong> - This is the period of expiry as 
explained above. In our example use case this will be 24 hour, specified in 
seconds.</li>
+</ul>
+<p>Configuration of the above parameters is sufficient to address this use
+case.</p>
+<h1 id="use-cases-summary">Use cases - Summary</h1>
+<ol>
+<li><strong>Basic Dedup</strong> - Deduplication of
+    bounded datasets. Data is assumed to be bounded. This use case is
+    not meant for never ending streams of data. For example:
+    Deduplication of master data like customer records, product catalogs
+    etc.</li>
+<li><strong>Time Based Dedup</strong> - Deduplication of
+    unlimited streams of data. This use case handles unbounded streams
+    of data and can run forever. An expiry key and criterion is expected
+    as part of the input which helps avoid storing all the unique data.
+    This helps speed up performance. Any timestamp field in the incoming
+    tuple can be used as a time based expiry key.<ul>
+<li><em>With respect to system time</em> - Time progresses with system time. 
Any expiry criteria are executed with the notion of system time. This is 
possible if the incoming tuple does not have a time field, or the user does not 
specify a <code>timeExpression</code>.</li>
+<li><em>With respect to tuple time</em> - Time progresses based on the time 
in the incoming tuples. Expiry criteria are executed with the notion of time 
indicated by the incoming tuple. Specification of the time field 
(<code>timeExpression</code>) is mandatory for this scenario.</li>
+</ul>
+</li>
+</ol>
+<h1 id="technical-architecture">Technical Architecture</h1>
+<h2 id="class-structure">Class Structure</h2>
+<p><img alt="" src="../images/deduper/image03.png" /></p>
+<hr />
+<h2 id="architectural-details">Architectural Details</h2>
+<p><img alt="" src="../images/deduper/image02.png" /></p>
+<h2 id="concepts">Concepts</h2>
+<h3 id="dedup-key-specified-by-keyexpression-parameter">Dedup Key - Specified 
by <em>keyExpression</em> parameter</h3>
+<p>A dedup key is a set of one or more fields in the data tuple which
+acts as the key for the tuples.
+This is used by the deduper to compare tuples to arrive at the
+conclusion on whether two tuples are duplicates. If Dedup Key of two
+tuples match, then they are duplicates, else they are unique.</p>
+<h3 id="expiry-key-specified-by-timeexpression-parameter">Expiry Key - 
Specified by <em>timeExpression</em> parameter</h3>
+<p>A tuple may or may not have an Expiry Key. Dedup operator cannot
+keep storing all the data that is flowing into the operator. At some
+point it becomes essential to discard some of the historical tuples in
+interest of memory and efficiency.</p>
+<p>At the same time, tuples are expected to arrive at the Dedup
+operator within some time after they are generated. After this time, the
+tuples may be considered as stale or obsolete.</p>
+<p>In such cases, the Deduper considers these tuples as
+expired and takes no action other than
+separating out these tuples on a different port in order to be processed
+by some other operator or stored offline for analysis.</p>
+<p>In order to create a criterion for discarding such tuples, we
+introduce an Expiry Key. Looking at the value of the Expiry Key in each
+tuple, we can decide whether or not to discard this tuple as
+expired.</p>
+<p>The expiry key that we consider in Time Based Dedup is
+time. This usually works with
+another parameter called Expiry Period defined next.</p>
+<h3 id="expiry-period_1">Expiry Period</h3>
+<p>The Expiry Period is the value supplied by the user which decides
+when a particular tuple expires.</p>
+<h3 id="time-points">Time Points</h3>
+<p>For every dataset that the deduper processes, a set of time points is 
maintained:</p>
+<ol>
+<li><em>Latest Point</em> - This is the maximum
+    time point observed in all the processed tuples.</li>
+<li><em>Expiry Point</em> - This is given by:
+    <code>Expiry Point = Latest Point - Expiry Period</code></li>
+</ol>
+<p>These points help the deduper to make decisions related to expiry
+of a tuple.</p>
+<h3 id="example-expiry">Example - Expiry</h3>
+<table>
+<thead>
+<tr>
+<th align="center">Tuple Id</th>
+<th align="center">Expiry Key (Expiry Period = 10)</th>
+<th align="center">Latest Point</th>
+<th align="center">Expiry Point</th>
+<th align="center">Decision for Tuple</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td align="center">1</td>
+<td align="center">10</td>
+<td align="center">10</td>
+<td align="center">1</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">2</td>
+<td align="center">20</td>
+<td align="center">20</td>
+<td align="center">11</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">3</td>
+<td align="center">25</td>
+<td align="center">25</td>
+<td align="center">16</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">4</td>
+<td align="center">40</td>
+<td align="center">40</td>
+<td align="center">31</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">5</td>
+<td align="center">21</td>
+<td align="center">40</td>
+<td align="center">31</td>
+<td align="center">Expired</td>
+</tr>
+<tr>
+<td align="center">6</td>
+<td align="center">35</td>
+<td align="center">40</td>
+<td align="center">31</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">7</td>
+<td align="center">45</td>
+<td align="center">45</td>
+<td align="center">36</td>
+<td align="center">Not Expired</td>
+</tr>
+<tr>
+<td align="center">8</td>
+<td align="center">57</td>
+<td align="center">57</td>
+<td align="center">48</td>
+<td align="center">Not Expired</td>
+</tr>
+</tbody>
+</table>
+<h3 id="time-buckets-a-component-of-managed-state">Time Buckets (A component 
of Managed State)</h3>
+<p>One of the requirements of the Deduper is to store all the unique
+tuples (actually, just the keys of tuples). Keeping an ever growing
+cache in memory is not scalable. So what we need is a limited cache
+backed by a persistent store. When data is requested to be fetched from managed
+state, it is also cached in an in-memory cache. Buckets help
+narrow down the search of duplicates for incoming tuples. A Bucket is an
+abstraction for a collection of tuples all of which share a common hash
+value based on some hash function or a range of time, for example: a
+bucket of data for 5 contiguous minutes. A Bucket has a span property called 
Bucket Span.</p>
+<h3 id="bucket-span">Bucket Span</h3>
+<p>Bucket span is simply the range of the domain
+that is covered by the Bucket. This span is specified in
+the domain of the Expiry key. If the Expiry
+Key is time,  then the Bucket span
+will be specified in seconds. It is
+only defined in case tuples have an Expiry Key.</p>
+<h3 id="number-of-buckets">Number of Buckets</h3>
+<p>The number of buckets can be given by - <code>Num Buckets = Expiry
+Period / Bucket Span</code></p>
+<p>This is because at any point of time, we need only store Expiry
+Period worth of data.</p>
+<h3 id="example-buckets">Example - Buckets</h3>
+<p><img alt="" src="../images/deduper/image01.png" /></p>
+<h2 id="assumptions">Assumptions</h2>
+<h3 id="assumption-1">Assumption 1 <a name="Assumption1"></a></h3>
+<p>This assumption is only applicable in case of Dedup with
+Expiry.</p>
+<p>For any two tuples, t1 and t2 having dedup keys d1 and d2, and
+expiry keys e1 and e2, respectively, the following holds:</p>
+<pre><code>If d1 = d2,
+  then e1 = e2
+</code></pre>
+
+<p>In other words, there may never
+be two tuples t1 and t2 such that:</p>
+<pre><code>Tuple 1: d1, e1
+Tuple 2: d2, e2
+d1 = d2 and e1 != e2
+</code></pre>
+
+<p>In other words, any two tuples with the same dedup key are assumed to have 
the
+same expiry key as well.
+This assumption was made with respect to certain use cases. These
+use cases follow this assumption in that the records which are
+duplicates are exactly identical. An example use case is when log
+messages are replayed erroneously, and we want to identify the duplicate
+log messages. In such cases, we need not worry about two different log
+messages having the same identifier but different timestamps. Since its
+a replay of the same data, the duplicate records are assumed to be
+exactly identical.</p>
+<p>In case the duplicate tuple has a different value for expiry key, the 
behavior of
+the deduper can be non-deterministic.</p>
+<h2 id="flow-of-a-tuple-through-dedup-operator">Flow of a Tuple through Dedup 
Operator</h2>
+<p>Tuples flow through the Dedup operator one by one. Deduper may process a 
tuple immediately, or store it in some data
+structure for later processing.</p>
+<p>When a tuple always arrives at the input
+port of the Dedup operator, it does
+the following tasks.</p>
+<h4 id="check-if-tuple-is-expired">Check if tuple is Expired</h4>
+<p>This is only done in case of Dedup with expiry. The
+following condition is used to check if the tuple is expired.</p>
+<pre><code>if ( Latest Point - Expiry Key &lt; Expiry Point )
+  then Expired
+</code></pre>
+
+<p>If the tuple is expired, then send it to the expired port.</p>
+<h4 id="check-if-tuple-is-a-duplicate-or-unique">Check if tuple is a Duplicate 
or Unique</h4>
+<p>Once a tuple passes the check of expiry, we proceed to check if
+the tuple is a duplicate of some earlier tuple. Note that
+if the tuple in question is not expired, the duplicate will also not
+have expired due to the assumption listed <a href="#Assumption1">here</a>.
+The Deduper queries the Managed state to fetch the value for the tuple key.
+This request is processed by the Managed state in a separate asynchronous 
thread.
+Once this request is submitted, the Deduper moves on to process other
+tuples. Additionally the Deduper also inserts the tuple being processed
+into a waiting events queue for later processing.</p>
+<h4 id="process-pending-tuples">Process pending tuples</h4>
+<p>Once the Deduper has looked at the all the tuples in the current window,
+it starts to process the tuples in the waiting queue to finalize the decision
+(unique or duplicate) for these tuples.
+Once the request to Managed state is completed for a tuple and the value is
+fetched from persistent storage, the Deduper can decide if the tuple in
+question is a duplicate or a unique.
+Depending on whether there is enough time left in the current window,
+it can do one of the following:</p>
+<ul>
+<li>Process only the tuples for which the managed state has completed 
processing.
+The tuples which are still being processed by managed state are skipped only 
to come back to them when it can no longer postpone it. This is typically done 
when the operator
+has idle time as there are no tuples on the input ports and the current window
+has still not ended.</li>
+<li>Block on them to complete their processing. This will happen when the 
current
+window has no time left, and the decision cannot be postponed. Note: An 
operator can end its window, only when all the tuples have been completely 
processed.  </li>
+</ul>
+<h1 id="ports-attributes-and-properties">Ports, Attributes and Properties</h1>
+<h2 id="ports">Ports</h2>
+<p>The deduper has a single input port and multiple output
+ports.</p>
+<ul>
+<li><strong><em>input</em></strong> - This is the input port through
+    which the tuples arrive at the Deduper.</li>
+<li><strong><em>unique</em></strong> - This is the output port on
+    which unique tuples are sent out by the Deduper.</li>
+<li><strong><em>duplicate</em></strong> - This is the output port on
+    which duplicate tuples are sent out by the Deduper.</li>
+<li><strong><em>expired</em></strong> - This is the output port on
+    which expired tuples are sent out by the Deduper.</li>
+</ul>
+<p>The user can choose which output ports to connect the down stream operators.
+All the output ports are optional and can be used as required by the use 
case.</p>
+<h2 id="attributes">Attributes</h2>
+<ul>
+<li><strong><em>Input port Attribute - input.TUPLE_CLASS</em></strong> - 
Class or the fully
+qualified class name.<ul>
+<li>Mandatory attribute</li>
+<li>Tells the operator about the type of the incoming
+tuple.</li>
+</ul>
+</li>
+</ul>
+<h2 id="properties">Properties</h2>
+<ul>
+<li>
+<p><strong><em>keyExpression</em></strong> - String</p>
+<ul>
+<li>Mandatory parameter.</li>
+<li>The java expression to extract the key fields in the incoming tuple 
(POJO)</li>
+</ul>
+</li>
+<li>
+<p><strong><em>timeExpression</em></strong> - String - (Time Based Deduper 
only)</p>
+<ul>
+<li>The java expression to extract the time field in the incoming tuple 
(POJO).</li>
+</ul>
+</li>
+<li>
+<p><strong><em>expireBefore</em></strong> - Long (Seconds) - (Time Based 
Deduper only)</p>
+<ul>
+<li>This is the total time period during which a tuple stays in the system and 
blocks any other tuple with the same key.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>bucketSpan</em></strong> - Long (Seconds) - (Time Based 
Deduper only)</p>
+<ul>
+<li>Mandatory parameter</li>
+<li>This is the unit which describes how large a bucket can be. Typically this 
should be defined depending on the use case. For example, if we have 
expireBefore set to 1 hour, then typically we would be clubbing data in the 
order of minutes, so a <code>bucketSpan</code> of a few minutes would make 
sense. Note that in this case, the entire data worth the 
<code>bucketSpan</code> will expire as a whole. Setting it to 1 minute would 
make the number of time buckets in the system to be 1 hour / 1 minute = 60 
buckets.  Similarly setting bucketSpan to 5 minutes would make number of 
buckets to be 12.</li>
+<li>Note that having too many or too few buckets could have a performance 
impact. If unsure, set the bucketSpan to the square root of 
<code>expireBefore</code>. This way the number of buckets and bucket span are 
balanced.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>referenceInstant</em></strong> -  Long (Seconds) - (Time 
Based Deduper only)</p>
+<ul>
+<li>The reference point from which to start the time which is use for expiry. 
Setting the referenceInstant to say, r seconds from the epoch, would initialize 
the start of expiry to be from that <code>instant = r</code>. The start and end 
of the expiry window periodically move by the span of a single bucket.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>numBuckets</em></strong> -  Integer - (Bounded Deduper 
only)</p>
+<ul>
+<li>Optional parameter, but recommended to be provided by the user.</li>
+<li>This is the number of buckets that need to be used for storing the keys of 
the incoming tuples.</li>
+<li>Users can decide upon the proper value for this parameter by guessing the 
number of distinct keys in the application. A reasonable value is the square 
root of N, where N is the number of distinct keys. If omitted, the Java 
MAX_VALUE for integer is used for N.</li>
+</ul>
+</li>
+</ul>
+<h1 id="example">Example</h1>
+<p>Please refer to <a 
href="https://github.com/DataTorrent/examples/tree/master/tutorials/dedup";>https://github.com/DataTorrent/examples/tree/master/tutorials/dedup</a>Â
 for
+an example on how to use Deduper.</p>
+<h1 id="partitioning">Partitioning</h1>
+<p>Deduper can be statically partitioned using the operator
+attribute: PARTITIONER</p>
+<p>Add the following property to the properties.xml file:</p>
+<pre><code>&lt;property&gt;
+        
&lt;name&gt;dt.operator.{OperatorName}.attr.PARTITIONER&lt;/name&gt;
+        
&lt;value&gt;com.datatorrent.common.partitioner.StatelessPartitioner:2&lt;/value&gt;
+&lt;/property&gt;
+</code></pre>
+
+<p>This will partition the Dedup operator into 2 static partitions. Change the 
number
+to the required number of partitions.</p>
+<p>Dynamic partitioning is currently not supported in the Deduper.</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer 
navigation">
+      
+        <a href="../enricher/" class="btn btn-neutral float-right" 
title="Enricher">Next <span class="icon icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../csvParserOperator/" class="btn btn-neutral" title="CSV 
Parser"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org";>MkDocs</a> using a <a 
href="https://github.com/snide/sphinx_rtd_theme";>theme</a> provided by <a 
href="https://readthedocs.org";>Read the Docs</a>.
+</footer>
+         
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../csvParserOperator/" style="color: #fcfcfc;">&laquo; 
Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../enricher/" style="color: 
#fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/apex-site/blob/afbb4705/content/docs/malhar-3.7/operators/enricher/index.html
----------------------------------------------------------------------
diff --git a/content/docs/malhar-3.7/operators/enricher/index.html 
b/content/docs/malhar-3.7/operators/enricher/index.html
new file mode 100644
index 0000000..413edbe
--- /dev/null
+++ b/content/docs/malhar-3.7/operators/enricher/index.html
@@ -0,0 +1,648 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Enricher - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link 
href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700'
 rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Enricher";
+    var mkdocs_page_input_path = "operators/enricher.md";
+    var mkdocs_page_url = "/operators/enricher/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar 
Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" 
method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" 
aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>APIs</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../../apis/calcite/">SQL</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">CSV Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvParserOperator/">CSV Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../deduper/">Deduper</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Enricher</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#pojo-enricher">POJO 
Enricher</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#operator-objective">Operator Objective</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#operator-usecase">Operator Usecase</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#operator-information">Operator Information</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#properties-attributes-and-ports">Properties, Attributes and 
Ports</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#platform-attributes-that-influences-operator-behavior">Platform 
Attributes that influences operator behavior</a></li>
+                
+                    <li><a class="toctree-l4" href="#ports">Ports</a></li>
+                
+            
+                <li class="toctree-l3"><a 
href="#limitations">Limitations</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#example">Example</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#advanced">Advanced</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#caching-mechanism-in-pojoenricher">Caching mechanism in 
POJOEnricher</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#partitioning-of-pojoenricher">Partitioning of POJOEnricher</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../AbstractJdbcTransactionableOutputOperator/">Jdbc 
Output Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jdbcPollInputOperator/">JDBC Poller Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">JSON Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">JSON Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transformer</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../xmlParserOperator/">XML Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">Json Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transform Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">Csv Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../s3outputmodule/">S3 Output Module</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>Enricher</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="pojo-enricher">POJO Enricher</h1>
+<h2 id="operator-objective">Operator Objective</h2>
+<p>This operator receives an POJO (<a 
href="https://en.wikipedia.org/wiki/Plain_Old_Java_Object";>Plain Old Java 
Object</a>) as an incoming tuple and uses an external source to enrich the data 
in 
+the incoming tuple and finally emits the enriched data as a new enriched 
POJO.</p>
+<p>POJOEnricher supports enrichment from following external sources:</p>
+<ol>
+<li><strong>JSON File Based</strong> - Reads the file in memory having content 
stored in JSON format and use that to enrich the data. This can be done using 
FSLoader implementation.</li>
+<li><strong>JDBC Based</strong> - Any JDBC store can act as an external entity 
to which enricher can request data for enriching incoming tuples. This can be 
done using JDBCLoader implementation.</li>
+</ol>
+<p>POJO Enricher does not hold any state and is <strong>idempotent</strong>, 
<strong>fault-tolerant</strong> and <strong>statically/dynamically 
partitionable</strong>.</p>
+<h2 id="operator-usecase">Operator Usecase</h2>
+<ol>
+<li>Bank <strong><em>transaction records</em></strong> usually contains 
customerId. For further analysis of transaction one wants the customer name and 
other customer related information. 
+Such information is present in another database. One could enrich the 
transaction's record with customer information using POJOEnricher.</li>
+<li><strong><em>Call Data Record (CDR)</em></strong> contains only 
mobile/telephone numbers of the customer. Customer information is missing in 
CDR. POJO Enricher can be used to enrich 
+CDR with customer data for further analysis.</li>
+</ol>
+<h2 id="operator-information">Operator Information</h2>
+<ol>
+<li>Operator location: <strong><em>malhar-contrib</em></strong></li>
+<li>Available since: <strong><em>3.4.0</em></strong></li>
+<li>Operator state: <strong><em>Evolving</em></strong></li>
+<li>Java Packages:<ul>
+<li>Operator: <strong><em><a 
href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/contrib/enrich/POJOEnricher.html";>com.datatorrent.contrib.enrich.POJOEnricher</a></em></strong></li>
+<li>FSLoader: <strong><em><a 
href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/contrib/enrich/FSLoader.html";>com.datatorrent.contrib.enrich.FSLoader</a></em></strong></li>
+<li>JDBCLoader: <strong><em><a 
href="https://www.datatorrent.com/docs/apidocs/com/datatorrent/contrib/enrich/JDBCLoader.html";>com.datatorrent.contrib.enrich.JDBCLoader</a></em></strong></li>
+</ul>
+</li>
+</ol>
+<h2 id="properties-attributes-and-ports">Properties, Attributes and Ports</h2>
+<h3 id="properties-of-pojoenricher"><a name="props"></a>Properties of 
POJOEnricher</h3>
+<table>
+<thead>
+<tr>
+<th><strong>Property</strong></th>
+<th><strong>Description</strong></th>
+<th><strong>Type</strong></th>
+<th><strong>Mandatory</strong></th>
+<th><strong>Default Value</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>includeFields</em></td>
+<td>List of fields from database that needs to be added to output POJO.</td>
+<td>List&lt;String></td>
+<td>Yes</td>
+<td>N/A</td>
+</tr>
+<tr>
+<td><em>lookupFields</em></td>
+<td>List of fields from input POJO which will form a <em>unique composite</em> 
key for querying to store</td>
+<td>List&lt;String></td>
+<td>Yes</td>
+<td>N/A</td>
+</tr>
+<tr>
+<td><em>store</em></td>
+<td>Backend Store from which data should be queried for enrichment</td>
+<td><a href="#backendStore">BackendStore</a></td>
+<td>Yes</td>
+<td>N/A</td>
+</tr>
+<tr>
+<td><em>cacheExpirationInterval</em></td>
+<td>Cache entry expiry in ms. After this time, the lookup to store will be 
done again for given key</td>
+<td>int</td>
+<td>No</td>
+<td>1 * 60 * 60 * 1000 (1 hour)</td>
+</tr>
+<tr>
+<td><em>cacheCleanupInterval</em></td>
+<td>Interval in ms after which cache will be removed for any stale 
entries.</td>
+<td>int</td>
+<td>No</td>
+<td>1 * 60 * 60 * 1000 (1 hour)</td>
+</tr>
+<tr>
+<td><em>cacheSize</em></td>
+<td>Number of entry in cache after which eviction will start on each addition 
based on LRU</td>
+<td>int</td>
+<td>No</td>
+<td>1000</td>
+</tr>
+</tbody>
+</table>
+<h4 id="properties-of-fsloader-backendstore"><a 
name="backendStore"></a>Properties of FSLoader (BackendStore)</h4>
+<table>
+<thead>
+<tr>
+<th><strong>Property</strong></th>
+<th><strong>Description</strong></th>
+<th><strong>Type</strong></th>
+<th><strong>Mandatory</strong></th>
+<th><strong>Default Value</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>fileName</em></td>
+<td>Path of the file, the data from which will be used for enrichment. See <a 
href="#JSONFileFormat">here</a> for JSON File format.</td>
+<td>String</td>
+<td>Yes</td>
+<td>N/A</td>
+</tr>
+</tbody>
+</table>
+<h4 id="properties-of-jdbcloader-backendstore">Properties of JDBCLoader 
(BackendStore)</h4>
+<table>
+<thead>
+<tr>
+<th><strong>Property</strong></th>
+<th><strong>Description</strong></th>
+<th><strong>Type</strong></th>
+<th><strong>Mandatory</strong></th>
+<th><strong>Default Value</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>databaseUrl</em></td>
+<td>Connection string for connecting to JDBC</td>
+<td>String</td>
+<td>Yes</td>
+<td>N/A</td>
+</tr>
+<tr>
+<td><em>databaseDriver</em></td>
+<td>JDBC Driver class for connection to JDBC Store. This driver should be 
there in classpath</td>
+<td>String</td>
+<td>Yes</td>
+<td>N/A</td>
+</tr>
+<tr>
+<td><em>tableName</em></td>
+<td>Name of the table from which data needs to be retrieved</td>
+<td>String</td>
+<td>Yes</td>
+<td>N/A</td>
+</tr>
+<tr>
+<td><em>connectionProperties</em></td>
+<td>Command seperated list of advanced connection properties that need to be 
passed to JDBC Driver. For eg. <em>prop1:val1,prop2:val2</em></td>
+<td>String</td>
+<td>No</td>
+<td>null</td>
+</tr>
+<tr>
+<td><em>queryStmt</em></td>
+<td>Select statement which will be used to query the data. This is optional 
parameter in case of advanced query.</td>
+<td>String</td>
+<td>No</td>
+<td>null</td>
+</tr>
+</tbody>
+</table>
+<h3 id="platform-attributes-that-influences-operator-behavior">Platform 
Attributes that influences operator behavior</h3>
+<table>
+<thead>
+<tr>
+<th><strong>Attribute</strong></th>
+<th><strong>Description</strong></th>
+<th><strong>Type</strong></th>
+<th><strong>Mandatory</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>input.TUPLE_CLASS</em></td>
+<td>TUPLE_CLASS attribute on input port which tells operator the class of POJO 
which will be incoming</td>
+<td>Class or FQCN</td>
+<td>Yes</td>
+</tr>
+<tr>
+<td><em>output.TUPLE_CLASS</em></td>
+<td>TUPLE_CLASS attribute on output port which tells operator the class of 
POJO which need to be emitted</td>
+<td>Class or FQCN</td>
+<td>Yes</td>
+</tr>
+</tbody>
+</table>
+<h3 id="ports">Ports</h3>
+<table>
+<thead>
+<tr>
+<th><strong>Port</strong></th>
+<th><strong>Description</strong></th>
+<th><strong>Type</strong></th>
+<th><strong>Mandatory</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>input</em></td>
+<td>Tuple which needs to be enriched are received on this port</td>
+<td>Object (POJO)</td>
+<td>Yes</td>
+</tr>
+<tr>
+<td><em>output</em></td>
+<td>Tuples that are enriched from external source are emitted from on this 
port</td>
+<td>Object (POJO)</td>
+<td>No</td>
+</tr>
+</tbody>
+</table>
+<h2 id="limitations">Limitations</h2>
+<p>Current POJOEnricher contains following limitation:</p>
+<ol>
+<li>FSLoader loads the file content in memory. Though it loads only the 
composite key and composite value in memory, a very large amount of data would 
bloat the memory and make the operator go OOM. In case the filesize is large, 
allocate sufficient memory to the POJOEnricher.</li>
+<li>Incoming POJO should be a subset of outgoing POJO.</li>
+<li><a href="#props">includeFields</a> property should contains fields having 
same name in database column as well as outgoing POJO. For eg. If name of the 
database column is "customerName", then outgoing POJO should contains a field 
with the same name and same should be added to includeFields.</li>
+<li><a href="#props">lookupFields</a> property should contains fields having 
same name in database column as well as incoming POJO. For eg. If name of the 
database column is "customerId", then incoming POJO should contains a field 
with the same name and same should be added to lookupFields.</li>
+</ol>
+<h2 id="example">Example</h2>
+<p>Example for POJOEnricher can be found at: <a 
href="https://github.com/DataTorrent/examples/tree/master/tutorials/enricher";>https://github.com/DataTorrent/examples/tree/master/tutorials/enricher</a></p>
+<h2 id="advanced">Advanced</h2>
+<h3 id="file-format-for-json-based-fsloader"><a name="JSONFileFormat"></a> 
File format for JSON based FSLoader</h3>
+<p>FSLoader expects file to be in specific format:</p>
+<ol>
+<li>Each line makes on record which becomes part of the store</li>
+<li>Each line is a valid JSON Object where <em>key</em> is name of the field 
name and <em>value</em> is the field value.</li>
+</ol>
+<p>Example for the format look like following:</p>
+<pre><code class="json">{&quot;circleId&quot;:0, 
&quot;circleName&quot;:&quot;A&quot;}
+{&quot;circleId&quot;:1, &quot;circleName&quot;:&quot;B&quot;}
+{&quot;circleId&quot;:2, &quot;circleName&quot;:&quot;C&quot;}
+{&quot;circleId&quot;:3, &quot;circleName&quot;:&quot;D&quot;}
+{&quot;circleId&quot;:4, &quot;circleName&quot;:&quot;E&quot;}
+{&quot;circleId&quot;:5, &quot;circleName&quot;:&quot;F&quot;}
+{&quot;circleId&quot;:6, &quot;circleName&quot;:&quot;G&quot;}
+{&quot;circleId&quot;:7, &quot;circleName&quot;:&quot;H&quot;}
+{&quot;circleId&quot;:8, &quot;circleName&quot;:&quot;I&quot;}
+{&quot;circleId&quot;:9, &quot;circleName&quot;:&quot;J&quot;}
+</code></pre>
+
+<h3 id="caching-mechanism-in-pojoenricher">Caching mechanism in 
POJOEnricher</h3>
+<p>POJOEnricher contains an cache which makes the lookup for keys more 
efficient. This is specially useful when data in external store is not changing 
much. 
+However, one should carefully tune the <a 
href="#props">cacheExpirationInterval</a> property for desirable results.</p>
+<p>On every incoming tuple, POJOEnricher first queries the cache. If the cache 
contains desired record and is within expiration interval, then it uses that to
+enrich the tuple, otherwise does a lookup to configured store and the return 
value is used to enrich the tuple. The return value is then cached for 
composite key and composite value.</p>
+<p>POJOEnricher only caches the required fields for enrichment mechanism and 
not all fields returned by external store. This ensures optimal use of 
memory.</p>
+<h3 id="partitioning-of-pojoenricher">Partitioning of POJOEnricher</h3>
+<p>Being stateless operator, POJOEnricher will ensure built-in partitioners 
present in Malhar library can be directly simply by setting few properties as 
follows:</p>
+<h4 id="stateless-partioning-of-pojoenricher">Stateless partioning of 
POJOEnricher</h4>
+<p>Stateless partitioning will ensure that POJOEnricher will will be 
partitioned right at the starting of the application and will remain 
partitioned throughout the lifetime of the DAG.
+POJOEnricher can be stateless partitioned by adding following lines to 
properties.xml:</p>
+<pre><code class="xml">  &lt;property&gt;
+    &lt;name&gt;dt.operator.{OperatorName}.attr.PARTITIONER&lt;/name&gt;
+    
&lt;value&gt;com.datatorrent.common.partitioner.StatelessPartitioner:2&lt;/value&gt;
+  &lt;/property&gt;
+</code></pre>
+
+<p>where {OperatorName} is the name of the POJOEnricher operator.
+Above lines will partition POJOEnricher statically 2 times. Above value can be 
changed accordingly to change the number of static partitions.</p>
+<h4 id="dynamic-partitioning-of-pojoenricher">Dynamic Partitioning of 
POJOEnricher</h4>
+<p>Dynamic partitioning is a feature of Apex platform which changes the 
partition of the operator based on certain condition.
+POJOEnricher can be dynamically partitioned using 2 out-of-the-box 
partitioners:</p>
+<h5 id="throughput-based">Throughput based</h5>
+<p>Following code can be added to populateDAG method of application to 
dynamically partitioning POJOEnricher:</p>
+<pre><code class="java">    
StatelessThroughputBasedPartitioner&lt;POJOEnricher&gt; partitioner = new 
StatelessThroughputBasedPartitioner&lt;&gt;();
+    partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));
+    partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
+    partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
+    dag.setAttribute(pojoEnricherObj, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{partitioner}));
+    dag.setAttribute(pojoEnricherObj, OperatorContext.PARTITIONER, 
partitioner);
+</code></pre>
+
+<p>Above code will dynamically partition POJOEnricher when the throughput 
changes.
+If the overall throughput of POJOEnricher goes beyond 30000 or less than 
10000, the platform will repartition POJOEnricher 
+to balance throughput of a single partition to be between 10000 and 30000.
+CooldownMillis of 10000 will be used as the threshold time for which the 
throughout change is observed.</p>
+<h5 id="latency-based">Latency based</h5>
+<p>Following code can be added to populateDAG method of application to 
dynamically partitioning POJOEnricher:</p>
+<pre><code class="java">    
StatelessLatencyBasedPartitioner&lt;POJOEnricher&gt; partitioner = new 
StatelessLatencyBasedPartitioner&lt;&gt;();
+    partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));
+    partitioner.setMaximumLatency(conf.getLong(MAX_THROUGHPUT, 10));
+    partitioner.setMinimumLatency(conf.getLong(MIN_THROUGHPUT, 3));
+    dag.setAttribute(pojoEnricherObj, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{partitioner}));
+    dag.setAttribute(pojoEnricherObj, OperatorContext.PARTITIONER, 
partitioner);
+</code></pre>
+
+<p>Above code will dynamically partition POJOEnricher when the overall latency 
of POJOEnricher changes.
+If the overall latency of POJOEnricher goes beyond 10 ms or less than 3 ms, 
the platform will repartition POJOEnricher 
+to balance latency of a single partition to be between 3 ms and 10 ms.
+CooldownMillis of 10000 will be used as the threshold time for which the 
latency change is observed.</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer 
navigation">
+      
+        <a href="../fsInputOperator/" class="btn btn-neutral float-right" 
title="File Input">Next <span class="icon icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../deduper/" class="btn btn-neutral" title="Deduper"><span 
class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org";>MkDocs</a> using a <a 
href="https://github.com/snide/sphinx_rtd_theme";>theme</a> provided by <a 
href="https://readthedocs.org";>Read the Docs</a>.
+</footer>
+         
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../deduper/" style="color: #fcfcfc;">&laquo; 
Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../fsInputOperator/" 
style="color: #fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/apex-site/blob/afbb4705/content/docs/malhar-3.7/operators/file_output/index.html
----------------------------------------------------------------------
diff --git a/content/docs/malhar-3.7/operators/file_output/index.html 
b/content/docs/malhar-3.7/operators/file_output/index.html
new file mode 100644
index 0000000..2f98981
--- /dev/null
+++ b/content/docs/malhar-3.7/operators/file_output/index.html
@@ -0,0 +1,483 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>File Output - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link 
href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700'
 rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "File Output";
+    var mkdocs_page_input_path = "operators/file_output.md";
+    var mkdocs_page_url = "/operators/file_output/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar 
Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" 
method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" 
aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>APIs</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../../apis/calcite/">SQL</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">CSV Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvParserOperator/">CSV Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../deduper/">Deduper</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../enricher/">Enricher</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">File Output</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a 
href="#abstractfileoutputoperator">AbstractFileOutputOperator</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#persisting-data-to-files">Persisting data to files</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#automatic-rotation">Automatic rotation</a></li>
+                
+                    <li><a class="toctree-l4" 
href="#fault-tolerance">Fault-tolerance</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../AbstractJdbcTransactionableOutputOperator/">Jdbc 
Output Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jdbcPollInputOperator/">JDBC Poller Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">JSON Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">JSON Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transformer</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../xmlParserOperator/">XML Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">Json Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transform Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvformatter/">Csv Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../s3outputmodule/">S3 Output Module</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>File Output</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 
id="abstractfileoutputoperator">AbstractFileOutputOperator</h1>
+<p>The abstract file output operator in Apache Apex Malhar library &mdash; <a 
href="https://github.com/apache/incubator-apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java";><code>AbstractFileOutputOperator</code></a>
 writes streaming data to files. The main features of this operator are:</p>
+<ol>
+<li>Persisting data to files.</li>
+<li>Automatic rotation of files based on:<br />
+  a. maximum length of a file.<br />
+  b. time-based rotation where time is specified using a count of application 
windows.</li>
+<li>Fault-tolerance.</li>
+<li>Compression and encryption of data before it is persisted.</li>
+</ol>
+<p>In this tutorial we will cover the details of the basic structure and 
implementation of all the above features in 
<code>AbstractFileOutputOperator</code>. Configuration items related to each 
feature are discussed as they are introduced in the section of that feature.</p>
+<h2 id="persisting-data-to-files">Persisting data to files</h2>
+<p>The principal function of this operator is to persist tuples to files 
efficiently. These files are created under a specific directory on the file 
system. The relevant configuration item is:</p>
+<p><strong>filePath</strong>: path specifying the directory where files are 
written.</p>
+<p>Different types of file system that are implementations of 
<code>org.apache.hadoop.fs.FileSystem</code> are supported. The file system 
instance which is used for creating streams is constructed from the 
<code>filePath</code> URI.</p>
+<pre><code class="java">FileSystem.newInstance(new Path(filePath).toUri(), new 
Configuration())
+</code></pre>
+
+<p>Tuples may belong to different files therefore expensive IO operations like 
creating multiple output streams, flushing of data to disk, and closing streams 
are handled carefully.</p>
+<h3 id="ports">Ports</h3>
+<ul>
+<li><code>input</code>: the input port on which tuples to be persisted are 
received.</li>
+</ul>
+<h3 id="streamscache"><code>streamsCache</code></h3>
+<p>This transient state caches output streams per file in memory. The file to 
which the data is appended may change with incoming tuples. It will be highly 
inefficient to keep re-opening streams for a file just because tuples for that 
file are interleaved with tuples for another file. Therefore, the operator 
maintains a cache of limited size with open output streams.</p>
+<p><code>streamsCache</code> is of type 
<code>com.google.common.cache.LoadingCache</code>. A <code>LoadingCache</code> 
has an attached <code>CacheLoader</code> which is responsible to load value of 
a key when the key is not present in the cache. Details are explained here- <a 
href="https://github.com/google/guava/wiki/CachesExplained";>CachesExplained</a>.</p>
+<p>The operator constructs this cache in <code>setup(...)</code>. It is built 
with the following configuration items:</p>
+<ul>
+<li><strong>maxOpenFiles</strong>: maximum size of the cache. The cache evicts 
entries that haven't been used recently when the cache size is approaching this 
limit. <em>Default</em>: 100</li>
+<li><strong>expireStreamAfterAcessMillis</strong>: expires streams after the 
specified duration has passed since the stream was last accessed. 
<em>Default</em>: value of attribute- 
<code>OperatorContext.SPIN_MILLIS</code>.</li>
+</ul>
+<p>An important point to note here is that the guava cache does not perform 
cleanup and evict values asynchronously, that is, instantly after a value 
expires. Instead, it performs small amounts of maintenance during write 
operations, or during occasional read operations if writes are rare.</p>
+<h4 id="cacheloader">CacheLoader</h4>
+<p><code>streamsCache</code> is created with a <code>CacheLoader</code> that 
opens an <code>FSDataOutputStream</code> for a file which is not in the cache. 
The output stream is opened in either <code>append</code> or 
<code>create</code> mode and the basic logic to determine this is explained by 
the simple diagram below.</p>
+<p><img alt="Opening an output stream" src="../images/fileoutput/diagram1.png" 
/></p>
+<p>This process gets complicated when fault-tolerance (writing to temporary 
files)  and rotation is added.</p>
+<p>Following are few configuration items used for opening the streams:</p>
+<ul>
+<li><strong>replication</strong>: specifies the replication factor of the 
output files. <em>Default</em>: <code>fs.getDefaultReplication(new 
Path(filePath))</code></li>
+<li><strong>filePermission</strong>: specifies the permission of the output 
files. The permission is an octal number similar to that used by the Unix chmod 
command. <em>Default</em>: 0777</li>
+</ul>
+<h4 id="removallistener">RemovalListener</h4>
+<p>A <code>Guava</code> cache also allows specification of removal listener 
which can perform some operation when an entry is removed from the cache. Since 
<code>streamsCache</code> is of limited size and also has time-based expiry 
enabled, it is imperative that when a stream is evicted from the cache it is 
closed properly. Therefore, we attach a removal listener to 
<code>streamsCache</code> which closes the stream when it is evicted.</p>
+<h3 id="setupoperatorcontext-context"><code>setup(OperatorContext 
context)</code></h3>
+<p>During setup the following main tasks are performed:</p>
+<ol>
+<li>FileSystem instance is created.</li>
+<li>The cache of streams is created.</li>
+<li>Files are recovered (see Fault-tolerance section).</li>
+<li>Stray part files are cleaned (see Automatic rotation section).</li>
+</ol>
+<h3 id="processtupleinput-tuple"><a 
name="processTuple"></a><code>processTuple(INPUT tuple)</code></h3>
+<p>The code snippet below highlights the basic steps of processing a tuple.</p>
+<pre><code class="java">protected void processTuple(INPUT tuple)
+{  
+  //which file to write to is derived from the tuple.
+  String fileName = getFileName(tuple);  
+
+  //streamsCache is queried for the output stream. If the stream is already 
opened then it is returned immediately otherwise the cache loader creates one.
+  FilterOutputStream fsOutput = streamsCache.get(fileName).getFilterStream();
+
+  byte[] tupleBytes = getBytesForTuple(tuple);
+
+  fsOutput.write(tupleBytes);
+}
+</code></pre>
+
+<h3 id="endwindow"><a name="endWindow"></a>endWindow()</h3>
+<p>It should be noted that while processing a tuple we do not flush the stream 
after every write. Since flushing is expensive it is done periodically for all 
the open streams in the operator's <code>endWindow()</code>.</p>
+<pre><code class="java">Map&lt;String, FSFilterStreamContext&gt; openStreams = 
streamsCache.asMap();
+for (FSFilterStreamContext streamContext: openStreams.values()) {
+  ...
+  //this flushes the stream
+  streamContext.finalizeContext();
+  ...
+}
+</code></pre>
+
+<p><code>FSFilterStreamContext</code> will be explained with compression and 
encryption.</p>
+<h3 id="teardown"><a name="teardown"></a>teardown()</h3>
+<p>When any operator in a DAG fails then the application master invokes 
<code>teardown()</code> for that operator and its downstream operators. In 
<code>AbstractFileOutputOperator</code> we have a bunch of open streams in the 
cache and the operator (acting as HDFS client) holds leases for all the 
corresponding files. It is important to release these leases for clean 
re-deployment. Therefore, we try to close all the open streams in 
<code>teardown()</code>.</p>
+<h2 id="automatic-rotation">Automatic rotation</h2>
+<p>In a streaming application where data is being continuously processed, when 
this output operator is used, data will be continuously written to an output 
file. The users may want to be able to take the data from time to time to use 
it, copy it out of Hadoop or do some other processing. Having all the data in a 
single file makes it difficult as the user needs to keep track of how much data 
has been read from the file each time so that the same data is not read again. 
Also users may already have processes and scripts in place that work with full 
files and not partial data from a file.</p>
+<p>To help solve these problems the operator supports creating many smaller 
files instead of writing to just one big file. Data is written to a file and 
when some condition is met the file is finalized and data is written to a new 
file. This is called file rotation. The user can determine when the file gets 
rotated. Each of these files is called a part file as they contain portion of 
the data.</p>
+<h3 id="part-filename">Part filename</h3>
+<p>The filename for a part file is formed by using the original file name and 
the part number. The part number starts from 0 and is incremented each time a 
new part file created. The default filename has the format, assuming origfile 
represents the original filename and partnum represents the part number,</p>
+<p><code>origfile.partnum</code></p>
+<p>This naming scheme can be changed by the user. It can be done so by 
overriding the following method</p>
+<pre><code class="java">protected String getPartFileName(String fileName, int 
part)
+</code></pre>
+
+<p>This method is passed the original filename and part number as arguments 
and should return the part filename.</p>
+<h3 id="mechanisms">Mechanisms</h3>
+<p>The user has a couple of ways to specify when a file gets rotated. First is 
based on size and second on time. In the first case the files are limited by 
size and in the second they are rotated by time.</p>
+<h4 id="size-based">Size Based</h4>
+<p>With size based rotation the user specifies a size limit. Once the size of 
the currently file reaches this limit the file is rotated. The size limit can 
be specified by setting the following property</p>
+<p><code>maxLength</code></p>
+<p>Like any other property this can be set in Java application code or in the 
property file.</p>
+<h4 id="time-based">Time Based</h4>
+<p>In time based rotation user specifies a time interval. This interval is 
specified as number of application windows. The files are rotated periodically 
once the specified number of application windows have elapsed. Since the 
interval is application window based it is not always exactly constant time. 
The interval can be specified using the following property</p>
+<p><code>rotationWindows</code></p>
+<h3 id="setupoperatorcontext-context_1"><code>setup(OperatorContext 
context)</code></h3>
+<p>When an operator is being started there may be stray part files and they 
need to be cleaned up. One common scenario, when these could be present, is in 
the case of failure, where a node running the operator failed and a previous 
instance of the operator was killed. This cleanup and other initial processing 
for the part files happens in the operator setup. The following diagram 
describes this process</p>
+<p><img alt="Rotation setup" src="../images/fileoutput/FileRotation.png" /></p>
+<h2 id="fault-tolerance">Fault-tolerance</h2>
+<p>There are two issues that should be addressed in order to make the operator 
fault-tolerant:</p>
+<ol>
+<li>
+<p>The operator flushes data to the filesystem every application window. This 
implies that after a failure when the operator is re-deployed and tuples of a 
window are replayed, then duplicate data will be saved to the files. This is 
handled by recording how much the operator has written to each file every 
window in a state that is checkpointed and truncating files back to the 
recovery checkpoint after re-deployment.</p>
+</li>
+<li>
+<p>While writing to HDFS, if the operator gets killed and didn't have the 
opportunity to close a file, then later when it is redeployed it will attempt 
to truncate/restore that file. Restoring a file may fail because the lease that 
the previous process (operator instance before failure) had acquired from 
namenode to write to a file may still linger and therefore there can be 
exceptions in acquiring the lease again by the new process (operator instance 
after failure). This is handled by always writing data to temporary files and 
renaming these files to actual files when a file is finalized (closed) for 
writing, that is, we are sure that no more data will be written to it. The 
relevant configuration item is:  </p>
+</li>
+<li><strong>alwaysWriteToTmp</strong>: enables/disables writing to a temporary 
file. <em>Default</em>: true.</li>
+</ol>
+<p>Most of the complexity in the code comes from making this operator 
fault-tolerant.</p>
+<h3 id="checkpointed-states-needed-for-fault-tolerance">Checkpointed states 
needed for fault-tolerance</h3>
+<ul>
+<li>
+<p><code>endOffsets</code>: contains the size of each file as it is being 
updated by the operator. It helps the operator to restore a file during 
recovery in operator <code>setup(...)</code> and is also used while loading a 
stream to find out if the operator has seen a file before.</p>
+</li>
+<li>
+<p><code>fileNameToTmpName</code>: contains the name of the temporary file per 
actual file. It is needed because the name of a temporary file is random. They 
are named based on the timestamp when the stream is created. During recovery 
the operator needs to know the temp file which it was writing to and if it 
needs restoration then it creates a new temp file and updates this mapping.</p>
+</li>
+<li>
+<p><code>finalizedFiles</code>: contains set of files which were requested to 
be finalized per window id.</p>
+</li>
+<li>
+<p><code>finalizedPart</code>: contains the latest <code>part</code> of each 
file which was requested to be finalized.</p>
+</li>
+</ul>
+<p>The use of <code>finalizedFiles</code> and <code>finalizedPart</code> are 
explained in detail under <a 
href="#requestFinalize"><code>requestFinalize(...)</code></a> method.</p>
+<h3 id="recovering-files">Recovering files</h3>
+<p>When the operator is re-deployed, it checks in its <code>setup(...)</code> 
method if the state of a file which it has seen before the failure is 
consistent with the file's state on the file system, that is, the size of the 
file on the file system should match the size in the <code>endOffsets</code>. 
When it doesn't the operator truncates the file.</p>
+<p>For example, let's say the operator wrote 100 bytes to test1.txt by the end 
of window 10. It wrote another 20 bytes by the end of window 12 but failed in 
window 13. When the operator gets re-deployed it is restored with window 10 
(recovery checkpoint) state. In the previous run, by the end of window 10, the 
size of file on the filesystem was 100 bytes but now it is 120 bytes. Tuples 
for windows 11 and 12 are going to be replayed. Therefore, in order to avoid 
writing duplicates to test1.txt, the operator truncates the file to 100 bytes 
(size at the end of window 10) discarding the last 20 bytes.</p>
+<h3 id="requestfinalizestring-filename"><a 
name="requestFinalize"></a><code>requestFinalize(String fileName)</code></h3>
+<p>When the operator is always writing to temporary files (in order to avoid 
HDFS Lease exceptions), then it is necessary to rename the temporary files to 
the actual files once it has been determined that the files are closed. This is 
refered to as <em>finalization</em> of files and the method allows the user 
code to specify when a file is ready for finalization.</p>
+<p>In this method, the requested file (or in the case of rotation &mdash; all 
the file parts including the latest open part which have not yet been requested 
for finalization) are registered for finalization. Registration is basically 
adding the file names to <code>finalizedFiles</code> state and updating 
<code>finalizedPart</code>.</p>
+<p>The process of <em>finalization</em> of all the files which were requested 
till the window <em>w</em> is deferred till window <em>w</em> is committed. 
This is because until a window is committed it can be replayed after a failure 
which means that a file can be open for writing even after it was requested for 
finalization.</p>
+<p>When rotation is enabled, part files as and when they get completed are 
requested for finalization. However, when rotation is not enabled user code 
needs to invoke this method as the knowledge that when a file is closed is 
unknown to this abstract operator.</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer 
navigation">
+      
+        <a href="../file_splitter/" class="btn btn-neutral float-right" 
title="File Splitter">Next <span class="icon 
icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../fsInputOperator/" class="btn btn-neutral" title="File 
Input"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org";>MkDocs</a> using a <a 
href="https://github.com/snide/sphinx_rtd_theme";>theme</a> provided by <a 
href="https://readthedocs.org";>Read the Docs</a>.
+</footer>
+         
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../fsInputOperator/" style="color: #fcfcfc;">&laquo; 
Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../file_splitter/" 
style="color: #fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

Reply via email to