http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/24ffdb6e/contrib/src/test/resources/com/datatorrent/contrib/romesyndication/datatorrent_feed_updated.rss
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/resources/com/datatorrent/contrib/romesyndication/datatorrent_feed_updated.rss
 
b/contrib/src/test/resources/com/datatorrent/contrib/romesyndication/datatorrent_feed_updated.rss
index 2403483..5e8f52a 100644
--- 
a/contrib/src/test/resources/com/datatorrent/contrib/romesyndication/datatorrent_feed_updated.rss
+++ 
b/contrib/src/test/resources/com/datatorrent/contrib/romesyndication/datatorrent_feed_updated.rss
@@ -94,41 +94,41 @@
 <ul>
 <li><strong>A JSON Schema:</strong> The JSON schema specifies the keys, 
aggregates, aggregators, dimension combinations, and time buckets to be used 
for Dimension Computation. An example of a schema that could be used for 
<strong>AdEvents</strong> is the following:</li>
 </ul>
-<pre class="prettyprint"><code class=" hljs json">{"<span 
class="hljs-attribute">keys</span>":<span class="hljs-value">[{"<span 
class="hljs-attribute">name</span>":<span class="hljs-value"><span 
class="hljs-string">"advertiser"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"string"</span></span>},
-         {"<span class="hljs-attribute">name</span>":<span 
class="hljs-value"><span class="hljs-string">"location"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"string"</span></span>}]</span>,
- "<span class="hljs-attribute">timeBuckets</span>":<span 
class="hljs-value">[<span class="hljs-string">"1m"</span>,<span 
class="hljs-string">"1h"</span>,<span class="hljs-string">"1d"</span>]</span>,
- "<span class="hljs-attribute">values</span>":
-  <span class="hljs-value">[{"<span class="hljs-attribute">name</span>":<span 
class="hljs-value"><span class="hljs-string">"impressions"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"long"</span></span>,"<span 
class="hljs-attribute">aggregators</span>":<span class="hljs-value">[<span 
class="hljs-string">"SUM"</span>,<span class="hljs-string">"MAX"</span>,<span 
class="hljs-string">"MIN"</span>]</span>},
-   {"<span class="hljs-attribute">name</span>":<span class="hljs-value"><span 
class="hljs-string">"clicks"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"long"</span></span>,"<span 
class="hljs-attribute">aggregators</span>":<span class="hljs-value">[<span 
class="hljs-string">"SUM"</span>,<span class="hljs-string">"MAX"</span>,<span 
class="hljs-string">"MIN"</span>]</span>},
-   {"<span class="hljs-attribute">name</span>":<span class="hljs-value"><span 
class="hljs-string">"cost"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"double"</span></span>,"<span 
class="hljs-attribute">aggregators</span>":<span class="hljs-value">[<span 
class="hljs-string">"SUM"</span>,<span class="hljs-string">"MAX"</span>,<span 
class="hljs-string">"MIN"</span>]</span>},
-   {"<span class="hljs-attribute">name</span>":<span class="hljs-value"><span 
class="hljs-string">"revenue"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"double"</span></span>,"<span 
class="hljs-attribute">aggregators</span>":<span class="hljs-value">[<span 
class="hljs-string">"SUM"</span>,<span class="hljs-string">"MAX"</span>,<span 
class="hljs-string">"MIN"</span>]</span>}]</span>,
- "<span class="hljs-attribute">dimensions</span>":
-  <span class="hljs-value">[{"<span 
class="hljs-attribute">combination</span>":<span class="hljs-value">[]</span>},
-   {"<span class="hljs-attribute">combination</span>":<span 
class="hljs-value">[<span class="hljs-string">"location"</span>]</span>},
-   {"<span class="hljs-attribute">combination</span>":<span 
class="hljs-value">[<span class="hljs-string">"advertiser"</span>]</span>},
-   {"<span class="hljs-attribute">combination</span>":<span 
class="hljs-value">[<span class="hljs-string">"advertiser"</span>,<span 
class="hljs-string">"location"</span>]</span>}]
+<pre class="prettyprint"><code class=" hljs json">{"<span 
class="hljs-attribute">keys</span>":<span class="hljs-value">[{"<span 
class="hljs-attribute">name</span>":<span class="hljs-value"><span 
class="hljs-string">"advertiser"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"string"</span></span>},
+         {"<span class="hljs-attribute">name</span>":<span 
class="hljs-value"><span class="hljs-string">"location"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"string"</span></span>}]</span>,
+ "<span class="hljs-attribute">timeBuckets</span>":<span 
class="hljs-value">[<span class="hljs-string">"1m"</span>,<span 
class="hljs-string">"1h"</span>,<span class="hljs-string">"1d"</span>]</span>,
+ "<span class="hljs-attribute">values</span>":
+  <span class="hljs-value">[{"<span class="hljs-attribute">name</span>":<span 
class="hljs-value"><span class="hljs-string">"impressions"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"long"</span></span>,"<span 
class="hljs-attribute">aggregators</span>":<span class="hljs-value">[<span 
class="hljs-string">"SUM"</span>,<span class="hljs-string">"MAX"</span>,<span 
class="hljs-string">"MIN"</span>]</span>},
+   {"<span class="hljs-attribute">name</span>":<span class="hljs-value"><span 
class="hljs-string">"clicks"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"long"</span></span>,"<span 
class="hljs-attribute">aggregators</span>":<span class="hljs-value">[<span 
class="hljs-string">"SUM"</span>,<span class="hljs-string">"MAX"</span>,<span 
class="hljs-string">"MIN"</span>]</span>},
+   {"<span class="hljs-attribute">name</span>":<span class="hljs-value"><span 
class="hljs-string">"cost"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"double"</span></span>,"<span 
class="hljs-attribute">aggregators</span>":<span class="hljs-value">[<span 
class="hljs-string">"SUM"</span>,<span class="hljs-string">"MAX"</span>,<span 
class="hljs-string">"MIN"</span>]</span>},
+   {"<span class="hljs-attribute">name</span>":<span class="hljs-value"><span 
class="hljs-string">"revenue"</span></span>,"<span 
class="hljs-attribute">type</span>":<span class="hljs-value"><span 
class="hljs-string">"double"</span></span>,"<span 
class="hljs-attribute">aggregators</span>":<span class="hljs-value">[<span 
class="hljs-string">"SUM"</span>,<span class="hljs-string">"MAX"</span>,<span 
class="hljs-string">"MIN"</span>]</span>}]</span>,
+ "<span class="hljs-attribute">dimensions</span>":
+  <span class="hljs-value">[{"<span 
class="hljs-attribute">combination</span>":<span class="hljs-value">[]</span>},
+   {"<span class="hljs-attribute">combination</span>":<span 
class="hljs-value">[<span class="hljs-string">"location"</span>]</span>},
+   {"<span class="hljs-attribute">combination</span>":<span 
class="hljs-value">[<span class="hljs-string">"advertiser"</span>]</span>},
+   {"<span class="hljs-attribute">combination</span>":<span 
class="hljs-value">[<span class="hljs-string">"advertiser"</span>,<span 
class="hljs-string">"location"</span>]</span>}]
 </span>}</code></pre>
 <ul>
 <li>A map from key names to the Java expression used to extract the key from 
an incoming POJO.</li>
 <li>A map from aggregate names to the Java expression used to extract the 
aggregate from an incoming POJO.</li>
 </ul>
 <p>An example of how to configure a Dimensions Computation operator to process 
<strong>AdEvents</strong> is as follows:</p>
-<pre class="prettyprint"><code class=" hljs 
avrasm">DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"DimensionsComputation"</span>, 
DimensionsComputationFlexibleSingleSchemaPOJO<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
-
-Map&lt;String, String&gt; keyToExpression = Maps<span 
class="hljs-preprocessor">.newHashMap</span>()<span 
class="hljs-comment">;</span>
-keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"advertiser"</span>, <span 
class="hljs-string">"getAdvertiser()"</span>)<span class="hljs-comment">;</span>
-keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"location"</span>, <span 
class="hljs-string">"getLocation()"</span>)<span class="hljs-comment">;</span>
-keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"time"</span>, <span 
class="hljs-string">"getTime()"</span>)<span class="hljs-comment">;</span>
-
-Map&lt;String, String&gt; aggregateToExpression = Maps<span 
class="hljs-preprocessor">.newHashMap</span>()<span 
class="hljs-comment">;</span>
-aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"cost"</span>, <span 
class="hljs-string">"getCost()"</span>)<span class="hljs-comment">;</span>
-aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"revenue"</span>, <span 
class="hljs-string">"getRevenue()"</span>)<span class="hljs-comment">;</span>
-aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"impressions"</span>, <span 
class="hljs-string">"getImpressions()"</span>)<span 
class="hljs-comment">;</span>
-aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"clicks"</span>, <span 
class="hljs-string">"getClicks()"</span>)<span class="hljs-comment">;</span>
-
-dimensions<span 
class="hljs-preprocessor">.setKeyToExpression</span>(keyToExpression)<span 
class="hljs-comment">;</span>
-dimensions<span 
class="hljs-preprocessor">.setAggregateToExpression</span>(aggregateToExpression)<span
 class="hljs-comment">;</span>
-//Here eventSchema is a string containing the JSON listed above.
+<pre class="prettyprint"><code class=" hljs 
avrasm">DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"DimensionsComputation"</span>, 
DimensionsComputationFlexibleSingleSchemaPOJO<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
+
+Map&lt;String, String&gt; keyToExpression = Maps<span 
class="hljs-preprocessor">.newHashMap</span>()<span 
class="hljs-comment">;</span>
+keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"advertiser"</span>, <span 
class="hljs-string">"getAdvertiser()"</span>)<span class="hljs-comment">;</span>
+keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"location"</span>, <span 
class="hljs-string">"getLocation()"</span>)<span class="hljs-comment">;</span>
+keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"time"</span>, <span 
class="hljs-string">"getTime()"</span>)<span class="hljs-comment">;</span>
+
+Map&lt;String, String&gt; aggregateToExpression = Maps<span 
class="hljs-preprocessor">.newHashMap</span>()<span 
class="hljs-comment">;</span>
+aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"cost"</span>, <span 
class="hljs-string">"getCost()"</span>)<span class="hljs-comment">;</span>
+aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"revenue"</span>, <span 
class="hljs-string">"getRevenue()"</span>)<span class="hljs-comment">;</span>
+aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"impressions"</span>, <span 
class="hljs-string">"getImpressions()"</span>)<span 
class="hljs-comment">;</span>
+aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"clicks"</span>, <span 
class="hljs-string">"getClicks()"</span>)<span class="hljs-comment">;</span>
+
+dimensions<span 
class="hljs-preprocessor">.setKeyToExpression</span>(keyToExpression)<span 
class="hljs-comment">;</span>
+dimensions<span 
class="hljs-preprocessor">.setAggregateToExpression</span>(aggregateToExpression)<span
 class="hljs-comment">;</span>
+//Here eventSchema is a string containing the JSON listed above.
 dimensions<span 
class="hljs-preprocessor">.setConfigurationSchemaJSON</span>(eventSchema)<span 
class="hljs-comment">;</span></code></pre>
 <h3 id="the-unification-phase">The Unification Phase</h3>
 <h4 id="the-theory-1">The Theory</h4>
@@ -158,89 +158,89 @@ dimensions<span 
class="hljs-preprocessor">.setConfigurationSchemaJSON</span>(eve
 <li>Setting an HDFS path for storing aggregation data.</li>
 </ul>
 <p>An example of configuring the store is as follows:</p>
-<pre class="prettyprint"><code class=" hljs 
avrasm">AppDataSingleSchemaDimensionStoreHDHT store = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"Store"</span>, AppDataSingleSchemaDimensionStoreHDHT<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
-
-TFileImpl hdsFile = new TFileImpl<span 
class="hljs-preprocessor">.DTFileImpl</span>()<span 
class="hljs-comment">;</span>
-hdsFile<span class="hljs-preprocessor">.setBasePath</span>(basePath)<span 
class="hljs-comment">;</span>
-store<span class="hljs-preprocessor">.setFileStore</span>(hdsFile)<span 
class="hljs-comment">;</span>
-store<span 
class="hljs-preprocessor">.setConfigurationSchemaJSON</span>(eventSchema)<span 
class="hljs-comment">;</span>
-
-String gatewayAddress = dag<span 
class="hljs-preprocessor">.getValue</span>(DAG<span 
class="hljs-preprocessor">.GATEWAY</span>_CONNECT_ADDRESS)<span 
class="hljs-comment">;</span>
-URI uri = URI<span class="hljs-preprocessor">.create</span>(<span 
class="hljs-string">"ws://"</span> + gatewayAddress + <span 
class="hljs-string">"/pubsub"</span>)<span class="hljs-comment">;</span>
-
-PubSubWebSocketAppDataQuery wsIn = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"Query"</span>, PubSubWebSocketAppDataQuery<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
-wsIn<span class="hljs-preprocessor">.setUri</span>(uri)<span 
class="hljs-comment">;</span>
-wsIn<span class="hljs-preprocessor">.setTopic</span>(<span 
class="hljs-string">"Query Topic"</span>)<span class="hljs-comment">;</span>
-
-PubSubWebSocketAppDataResult wsOut = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"QueryResult"</span>, PubSubWebSocketAppDataResult<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
-wsOut<span class="hljs-preprocessor">.setUri</span>(uri)<span 
class="hljs-comment">;</span>
-wsOut<span class="hljs-preprocessor">.setTopic</span>(<span 
class="hljs-string">"Result Topic"</span>)<span class="hljs-comment">;</span>
-
-dag<span class="hljs-preprocessor">.addStream</span>(<span 
class="hljs-string">"Query"</span>, wsIn<span 
class="hljs-preprocessor">.outputPort</span>, store<span 
class="hljs-preprocessor">.query</span>)<span class="hljs-comment">;</span>
+<pre class="prettyprint"><code class=" hljs 
avrasm">AppDataSingleSchemaDimensionStoreHDHT store = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"Store"</span>, AppDataSingleSchemaDimensionStoreHDHT<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
+
+TFileImpl hdsFile = new TFileImpl<span 
class="hljs-preprocessor">.DTFileImpl</span>()<span 
class="hljs-comment">;</span>
+hdsFile<span class="hljs-preprocessor">.setBasePath</span>(basePath)<span 
class="hljs-comment">;</span>
+store<span class="hljs-preprocessor">.setFileStore</span>(hdsFile)<span 
class="hljs-comment">;</span>
+store<span 
class="hljs-preprocessor">.setConfigurationSchemaJSON</span>(eventSchema)<span 
class="hljs-comment">;</span>
+
+String gatewayAddress = dag<span 
class="hljs-preprocessor">.getValue</span>(DAG<span 
class="hljs-preprocessor">.GATEWAY</span>_CONNECT_ADDRESS)<span 
class="hljs-comment">;</span>
+URI uri = URI<span class="hljs-preprocessor">.create</span>(<span 
class="hljs-string">"ws://"</span> + gatewayAddress + <span 
class="hljs-string">"/pubsub"</span>)<span class="hljs-comment">;</span>
+
+PubSubWebSocketAppDataQuery wsIn = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"Query"</span>, PubSubWebSocketAppDataQuery<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
+wsIn<span class="hljs-preprocessor">.setUri</span>(uri)<span 
class="hljs-comment">;</span>
+wsIn<span class="hljs-preprocessor">.setTopic</span>(<span 
class="hljs-string">"Query Topic"</span>)<span class="hljs-comment">;</span>
+
+PubSubWebSocketAppDataResult wsOut = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"QueryResult"</span>, PubSubWebSocketAppDataResult<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
+wsOut<span class="hljs-preprocessor">.setUri</span>(uri)<span 
class="hljs-comment">;</span>
+wsOut<span class="hljs-preprocessor">.setTopic</span>(<span 
class="hljs-string">"Result Topic"</span>)<span class="hljs-comment">;</span>
+
+dag<span class="hljs-preprocessor">.addStream</span>(<span 
class="hljs-string">"Query"</span>, wsIn<span 
class="hljs-preprocessor">.outputPort</span>, store<span 
class="hljs-preprocessor">.query</span>)<span class="hljs-comment">;</span>
 dag<span class="hljs-preprocessor">.addStream</span>(<span 
class="hljs-string">"QueryResult"</span>, store<span 
class="hljs-preprocessor">.queryResult</span>, wsOut<span 
class="hljs-preprocessor">.input</span>)<span 
class="hljs-comment">;</span></code></pre>
 <h3 id="putting-it-all-together">Putting it all Together</h3>
 <p>When you combine all the pieces described above, an application that 
visualizes <strong>AdEvents</strong> looks like this:</p>
-<pre class="prettyprint"><code class=" hljs 
avrasm">@ApplicationAnnotation(name=<span 
class="hljs-string">"AdEventDemo"</span>)
-public class AdEventDemo implements StreamingApplication
-{
-  public static final String EVENT_SCHEMA = <span 
class="hljs-string">"adsGenericEventSchema.json"</span><span 
class="hljs-comment">;</span>
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    //This loads the eventSchema<span class="hljs-preprocessor">.json</span> 
file which is a jar resource file.
-    String eventSchema = SchemaUtils<span 
class="hljs-preprocessor">.jarResourceFileToString</span>(<span 
class="hljs-string">"eventSchema.json"</span>)<span 
class="hljs-comment">;</span>
-
-    //Operator that receives Ad Events
-    AdEventReceiver receiver = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span class="hljs-string">"Event 
Receiver"</span>, AdEventReceiver<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
-
-    //Dimension Computation
-    DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"DimensionsComputation"</span>, 
DimensionsComputationFlexibleSingleSchemaPOJO<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
-
-    Map&lt;String, String&gt; keyToExpression = Maps<span 
class="hljs-preprocessor">.newHashMap</span>()<span 
class="hljs-comment">;</span>
-    keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"advertiser"</span>, <span 
class="hljs-string">"getAdvertiser()"</span>)<span class="hljs-comment">;</span>
-    keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"location"</span>, <span 
class="hljs-string">"getLocation()"</span>)<span class="hljs-comment">;</span>
-    keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"time"</span>, <span 
class="hljs-string">"getTime()"</span>)<span class="hljs-comment">;</span>
-
-    Map&lt;String, String&gt; aggregateToExpression = Maps<span 
class="hljs-preprocessor">.newHashMap</span>()<span 
class="hljs-comment">;</span>
-    aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"cost"</span>, <span 
class="hljs-string">"getCost()"</span>)<span class="hljs-comment">;</span>
-    aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"revenue"</span>, <span 
class="hljs-string">"getRevenue()"</span>)<span class="hljs-comment">;</span>
-    aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"impressions"</span>, <span 
class="hljs-string">"getImpressions()"</span>)<span 
class="hljs-comment">;</span>
-    aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"clicks"</span>, <span 
class="hljs-string">"getClicks()"</span>)<span class="hljs-comment">;</span>
-
-    dimensions<span 
class="hljs-preprocessor">.setKeyToExpression</span>(keyToExpression)<span 
class="hljs-comment">;</span>
-    dimensions<span 
class="hljs-preprocessor">.setAggregateToExpression</span>(aggregateToExpression)<span
 class="hljs-comment">;</span>
-    dimensions<span 
class="hljs-preprocessor">.setConfigurationSchemaJSON</span>(eventSchema)<span 
class="hljs-comment">;</span>
-
-    dimensions<span class="hljs-preprocessor">.setUnifier</span>(new 
DimensionsComputationUnifierImpl&lt;InputEvent, Aggregate&gt;())<span 
class="hljs-comment">;</span>
-
-    //Dimension Store
-    AppDataSingleSchemaDimensionStoreHDHT store = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"Store"</span>, AppDataSingleSchemaDimensionStoreHDHT<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
-
-    TFileImpl hdsFile = new TFileImpl<span 
class="hljs-preprocessor">.DTFileImpl</span>()<span 
class="hljs-comment">;</span>
-    hdsFile<span class="hljs-preprocessor">.setBasePath</span>(<span 
class="hljs-string">"dataStorePath"</span>)<span class="hljs-comment">;</span>
-    store<span class="hljs-preprocessor">.setFileStore</span>(hdsFile)<span 
class="hljs-comment">;</span>
-    store<span 
class="hljs-preprocessor">.setConfigurationSchemaJSON</span>(eventSchema)<span 
class="hljs-comment">;</span>
-
-    String gatewayAddress = dag<span 
class="hljs-preprocessor">.getValue</span>(DAG<span 
class="hljs-preprocessor">.GATEWAY</span>_CONNECT_ADDRESS)<span 
class="hljs-comment">;</span>
-    URI uri = URI<span class="hljs-preprocessor">.create</span>(<span 
class="hljs-string">"ws://"</span> + gatewayAddress + <span 
class="hljs-string">"/pubsub"</span>)<span class="hljs-comment">;</span>
-
-    PubSubWebSocketAppDataQuery wsIn = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"Query"</span>, PubSubWebSocketAppDataQuery<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
-    wsIn<span class="hljs-preprocessor">.setUri</span>(uri)<span 
class="hljs-comment">;</span>
-    wsIn<span class="hljs-preprocessor">.setTopic</span>(<span 
class="hljs-string">"Query Topic"</span>)<span class="hljs-comment">;</span>
-
-    PubSubWebSocketAppDataResult wsOut = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"QueryResult"</span>, PubSubWebSocketAppDataResult<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
-    wsOut<span class="hljs-preprocessor">.setUri</span>(uri)<span 
class="hljs-comment">;</span>
-    wsOut<span class="hljs-preprocessor">.setTopic</span>(<span 
class="hljs-string">"Result Topic"</span>)<span class="hljs-comment">;</span>
-
-    //Configure Streams
-
-    dag<span class="hljs-preprocessor">.addStream</span>(<span 
class="hljs-string">"Query"</span>, wsIn<span 
class="hljs-preprocessor">.outputPort</span>, store<span 
class="hljs-preprocessor">.query</span>)<span class="hljs-comment">;</span>
-    dag<span class="hljs-preprocessor">.addStream</span>(<span 
class="hljs-string">"QueryResult"</span>, store<span 
class="hljs-preprocessor">.queryResult</span>, wsOut<span 
class="hljs-preprocessor">.input</span>)<span class="hljs-comment">;</span>
-
-    dag<span class="hljs-preprocessor">.addStream</span>(<span 
class="hljs-string">"InputStream"</span>, receiver<span 
class="hljs-preprocessor">.output</span>, dimensions<span 
class="hljs-preprocessor">.input</span>)<span class="hljs-comment">;</span>
-    dag<span class="hljs-preprocessor">.addStream</span>(<span 
class="hljs-string">"DimensionalData"</span>, dimensions<span 
class="hljs-preprocessor">.output</span>, store<span 
class="hljs-preprocessor">.input</span>)<span class="hljs-comment">;</span>
-  }
+<pre class="prettyprint"><code class=" hljs 
avrasm">@ApplicationAnnotation(name=<span 
class="hljs-string">"AdEventDemo"</span>)
+public class AdEventDemo implements StreamingApplication
+{
+  public static final String EVENT_SCHEMA = <span 
class="hljs-string">"adsGenericEventSchema.json"</span><span 
class="hljs-comment">;</span>
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    //This loads the eventSchema<span class="hljs-preprocessor">.json</span> 
file which is a jar resource file.
+    String eventSchema = SchemaUtils<span 
class="hljs-preprocessor">.jarResourceFileToString</span>(<span 
class="hljs-string">"eventSchema.json"</span>)<span 
class="hljs-comment">;</span>
+
+    //Operator that receives Ad Events
+    AdEventReceiver receiver = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span class="hljs-string">"Event 
Receiver"</span>, AdEventReceiver<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
+
+    //Dimension Computation
+    DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"DimensionsComputation"</span>, 
DimensionsComputationFlexibleSingleSchemaPOJO<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
+
+    Map&lt;String, String&gt; keyToExpression = Maps<span 
class="hljs-preprocessor">.newHashMap</span>()<span 
class="hljs-comment">;</span>
+    keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"advertiser"</span>, <span 
class="hljs-string">"getAdvertiser()"</span>)<span class="hljs-comment">;</span>
+    keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"location"</span>, <span 
class="hljs-string">"getLocation()"</span>)<span class="hljs-comment">;</span>
+    keyToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"time"</span>, <span 
class="hljs-string">"getTime()"</span>)<span class="hljs-comment">;</span>
+
+    Map&lt;String, String&gt; aggregateToExpression = Maps<span 
class="hljs-preprocessor">.newHashMap</span>()<span 
class="hljs-comment">;</span>
+    aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"cost"</span>, <span 
class="hljs-string">"getCost()"</span>)<span class="hljs-comment">;</span>
+    aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"revenue"</span>, <span 
class="hljs-string">"getRevenue()"</span>)<span class="hljs-comment">;</span>
+    aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"impressions"</span>, <span 
class="hljs-string">"getImpressions()"</span>)<span 
class="hljs-comment">;</span>
+    aggregateToExpression<span class="hljs-preprocessor">.put</span>(<span 
class="hljs-string">"clicks"</span>, <span 
class="hljs-string">"getClicks()"</span>)<span class="hljs-comment">;</span>
+
+    dimensions<span 
class="hljs-preprocessor">.setKeyToExpression</span>(keyToExpression)<span 
class="hljs-comment">;</span>
+    dimensions<span 
class="hljs-preprocessor">.setAggregateToExpression</span>(aggregateToExpression)<span
 class="hljs-comment">;</span>
+    dimensions<span 
class="hljs-preprocessor">.setConfigurationSchemaJSON</span>(eventSchema)<span 
class="hljs-comment">;</span>
+
+    dimensions<span class="hljs-preprocessor">.setUnifier</span>(new 
DimensionsComputationUnifierImpl&lt;InputEvent, Aggregate&gt;())<span 
class="hljs-comment">;</span>
+
+    //Dimension Store
+    AppDataSingleSchemaDimensionStoreHDHT store = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"Store"</span>, AppDataSingleSchemaDimensionStoreHDHT<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
+
+    TFileImpl hdsFile = new TFileImpl<span 
class="hljs-preprocessor">.DTFileImpl</span>()<span 
class="hljs-comment">;</span>
+    hdsFile<span class="hljs-preprocessor">.setBasePath</span>(<span 
class="hljs-string">"dataStorePath"</span>)<span class="hljs-comment">;</span>
+    store<span class="hljs-preprocessor">.setFileStore</span>(hdsFile)<span 
class="hljs-comment">;</span>
+    store<span 
class="hljs-preprocessor">.setConfigurationSchemaJSON</span>(eventSchema)<span 
class="hljs-comment">;</span>
+
+    String gatewayAddress = dag<span 
class="hljs-preprocessor">.getValue</span>(DAG<span 
class="hljs-preprocessor">.GATEWAY</span>_CONNECT_ADDRESS)<span 
class="hljs-comment">;</span>
+    URI uri = URI<span class="hljs-preprocessor">.create</span>(<span 
class="hljs-string">"ws://"</span> + gatewayAddress + <span 
class="hljs-string">"/pubsub"</span>)<span class="hljs-comment">;</span>
+
+    PubSubWebSocketAppDataQuery wsIn = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"Query"</span>, PubSubWebSocketAppDataQuery<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
+    wsIn<span class="hljs-preprocessor">.setUri</span>(uri)<span 
class="hljs-comment">;</span>
+    wsIn<span class="hljs-preprocessor">.setTopic</span>(<span 
class="hljs-string">"Query Topic"</span>)<span class="hljs-comment">;</span>
+
+    PubSubWebSocketAppDataResult wsOut = dag<span 
class="hljs-preprocessor">.addOperator</span>(<span 
class="hljs-string">"QueryResult"</span>, PubSubWebSocketAppDataResult<span 
class="hljs-preprocessor">.class</span>)<span class="hljs-comment">;</span>
+    wsOut<span class="hljs-preprocessor">.setUri</span>(uri)<span 
class="hljs-comment">;</span>
+    wsOut<span class="hljs-preprocessor">.setTopic</span>(<span 
class="hljs-string">"Result Topic"</span>)<span class="hljs-comment">;</span>
+
+    //Configure Streams
+
+    dag<span class="hljs-preprocessor">.addStream</span>(<span 
class="hljs-string">"Query"</span>, wsIn<span 
class="hljs-preprocessor">.outputPort</span>, store<span 
class="hljs-preprocessor">.query</span>)<span class="hljs-comment">;</span>
+    dag<span class="hljs-preprocessor">.addStream</span>(<span 
class="hljs-string">"QueryResult"</span>, store<span 
class="hljs-preprocessor">.queryResult</span>, wsOut<span 
class="hljs-preprocessor">.input</span>)<span class="hljs-comment">;</span>
+
+    dag<span class="hljs-preprocessor">.addStream</span>(<span 
class="hljs-string">"InputStream"</span>, receiver<span 
class="hljs-preprocessor">.output</span>, dimensions<span 
class="hljs-preprocessor">.input</span>)<span class="hljs-comment">;</span>
+    dag<span class="hljs-preprocessor">.addStream</span>(<span 
class="hljs-string">"DimensionalData"</span>, dimensions<span 
class="hljs-preprocessor">.output</span>, store<span 
class="hljs-preprocessor">.input</span>)<span class="hljs-comment">;</span>
+  }
 }</code></pre>
 <h3> </h3>
 <h3 id="visualizing-the-aggregations">Visualizing The Aggregations</h3>
@@ -276,22 +276,22 @@ public class AdEventDemo implements StreamingApplication
 <p>Let us continue with our example of an advertising publisher. Let us now 
see the steps that the publisher might take to ensure that large volumes of raw 
advertisement data is converted into meaningful historical views of their 
advertisement events.</p>
 <h3 id="the-data">The Data</h3>
 <p>Typically advertising publishers receive packets of information for each 
advertising event. The events that a publisher receives might look like 
this:</p>
-<pre class="prettyprint"><code class=" hljs cs">    <span 
class="hljs-keyword">public</span> <span class="hljs-keyword">class</span> 
AdEvent
-    {
-        <span class="hljs-comment">//The name of the company that is 
advertising</span>
-      <span class="hljs-keyword">public</span> String advertiser;
-      <span class="hljs-comment">//The geographical location of the person 
initiating the event</span>
-      <span class="hljs-keyword">public</span> String location;
-      <span class="hljs-comment">//How much the advertiser was charged for the 
event</span>
-      <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">double</span> cost;
-      <span class="hljs-comment">//How much revenue was generated for the 
advertiser</span>
-      <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">double</span> revenue;
-      <span class="hljs-comment">//The number of impressions the advertiser 
received from this event</span>
-      <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">long</span> impressions;
-      <span class="hljs-comment">//The number of clicks the advertiser 
received from this event</span>
-      <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">long</span> clicks;
-      <span class="hljs-comment">//The timestamp of the event in 
milliseconds</span>
-      <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">long</span> time;
+<pre class="prettyprint"><code class=" hljs cs">    <span 
class="hljs-keyword">public</span> <span class="hljs-keyword">class</span> 
AdEvent
+    {
+        <span class="hljs-comment">//The name of the company that is 
advertising</span>
+      <span class="hljs-keyword">public</span> String advertiser;
+      <span class="hljs-comment">//The geographical location of the person 
initiating the event</span>
+      <span class="hljs-keyword">public</span> String location;
+      <span class="hljs-comment">//How much the advertiser was charged for the 
event</span>
+      <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">double</span> cost;
+      <span class="hljs-comment">//How much revenue was generated for the 
advertiser</span>
+      <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">double</span> revenue;
+      <span class="hljs-comment">//The number of impressions the advertiser 
received from this event</span>
+      <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">long</span> impressions;
+      <span class="hljs-comment">//The number of clicks the advertiser 
received from this event</span>
+      <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">long</span> clicks;
+      <span class="hljs-comment">//The timestamp of the event in 
milliseconds</span>
+      <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">long</span> time;
     }</code></pre>
 <p>The class <strong>AdEvent</strong> contains two types of data:</p>
 <ul>
@@ -449,25 +449,25 @@ public class AdEventDemo implements StreamingApplication
 <p>This creates a directory called <strong>wordcount</strong>, with a sample 
application and build script. Let us see how to modify this application into 
the Scala-based word count application that we are looking to develop.</p>
 <h3 id="add-the-scala-build-plugin">Add the Scala build plugin.</h3>
 <p>Apache Apex uses maven for building the framework and operator library. 
Maven supports a plugin for compiling Scala files. To enable this plugin, add 
the following snippet to the <code>build -&gt; plugins</code> sections of the 
<code>pom.xml</code> file that is located in the application directory.</p>
-<pre class="prettyprint"><code class="language-xml hljs ">  <span 
class="hljs-tag">&lt;<span class="hljs-title">plugin</span>&gt;</span>
-    <span class="hljs-tag">&lt;<span 
class="hljs-title">groupId</span>&gt;</span>net.alchim31.maven<span 
class="hljs-tag">&lt;/<span class="hljs-title">groupId</span>&gt;</span>
-    <span class="hljs-tag">&lt;<span 
class="hljs-title">artifactId</span>&gt;</span>scala-maven-plugin<span 
class="hljs-tag">&lt;/<span class="hljs-title">artifactId</span>&gt;</span>
-    <span class="hljs-tag">&lt;<span 
class="hljs-title">version</span>&gt;</span>3.2.1<span 
class="hljs-tag">&lt;/<span class="hljs-title">version</span>&gt;</span>
-    <span class="hljs-tag">&lt;<span 
class="hljs-title">executions</span>&gt;</span>
-      <span class="hljs-tag">&lt;<span 
class="hljs-title">execution</span>&gt;</span>
-        <span class="hljs-tag">&lt;<span 
class="hljs-title">goals</span>&gt;</span>
-          <span class="hljs-tag">&lt;<span 
class="hljs-title">goal</span>&gt;</span>compile<span 
class="hljs-tag">&lt;/<span class="hljs-title">goal</span>&gt;</span>
-          <span class="hljs-tag">&lt;<span 
class="hljs-title">goal</span>&gt;</span>testCompile<span 
class="hljs-tag">&lt;/<span class="hljs-title">goal</span>&gt;</span>
-        <span class="hljs-tag">&lt;/<span 
class="hljs-title">goals</span>&gt;</span>
-      <span class="hljs-tag">&lt;/<span 
class="hljs-title">execution</span>&gt;</span>
-    <span class="hljs-tag">&lt;/<span 
class="hljs-title">executions</span>&gt;</span>
+<pre class="prettyprint"><code class="language-xml hljs ">  <span 
class="hljs-tag">&lt;<span class="hljs-title">plugin</span>&gt;</span>
+    <span class="hljs-tag">&lt;<span 
class="hljs-title">groupId</span>&gt;</span>net.alchim31.maven<span 
class="hljs-tag">&lt;/<span class="hljs-title">groupId</span>&gt;</span>
+    <span class="hljs-tag">&lt;<span 
class="hljs-title">artifactId</span>&gt;</span>scala-maven-plugin<span 
class="hljs-tag">&lt;/<span class="hljs-title">artifactId</span>&gt;</span>
+    <span class="hljs-tag">&lt;<span 
class="hljs-title">version</span>&gt;</span>3.2.1<span 
class="hljs-tag">&lt;/<span class="hljs-title">version</span>&gt;</span>
+    <span class="hljs-tag">&lt;<span 
class="hljs-title">executions</span>&gt;</span>
+      <span class="hljs-tag">&lt;<span 
class="hljs-title">execution</span>&gt;</span>
+        <span class="hljs-tag">&lt;<span 
class="hljs-title">goals</span>&gt;</span>
+          <span class="hljs-tag">&lt;<span 
class="hljs-title">goal</span>&gt;</span>compile<span 
class="hljs-tag">&lt;/<span class="hljs-title">goal</span>&gt;</span>
+          <span class="hljs-tag">&lt;<span 
class="hljs-title">goal</span>&gt;</span>testCompile<span 
class="hljs-tag">&lt;/<span class="hljs-title">goal</span>&gt;</span>
+        <span class="hljs-tag">&lt;/<span 
class="hljs-title">goals</span>&gt;</span>
+      <span class="hljs-tag">&lt;/<span 
class="hljs-title">execution</span>&gt;</span>
+    <span class="hljs-tag">&lt;/<span 
class="hljs-title">executions</span>&gt;</span>
   <span class="hljs-tag">&lt;/<span 
class="hljs-title">plugin</span>&gt;</span></code></pre>
 <p>Also, specify the Scala library as a dependency in the pom.xml file.<br />
 Add the Scala library.</p>
-<pre class="prettyprint"><code class="language-xml hljs "><span 
class="hljs-tag">&lt;<span class="hljs-title">dependency</span>&gt;</span>
- <span class="hljs-tag">&lt;<span 
class="hljs-title">groupId</span>&gt;</span>org.scala-lang<span 
class="hljs-tag">&lt;/<span class="hljs-title">groupId</span>&gt;</span>
- <span class="hljs-tag">&lt;<span 
class="hljs-title">artifactId</span>&gt;</span>scala-library<span 
class="hljs-tag">&lt;/<span class="hljs-title">artifactId</span>&gt;</span>
- <span class="hljs-tag">&lt;<span 
class="hljs-title">version</span>&gt;</span>2.11.2<span 
class="hljs-tag">&lt;/<span class="hljs-title">version</span>&gt;</span>
+<pre class="prettyprint"><code class="language-xml hljs "><span 
class="hljs-tag">&lt;<span class="hljs-title">dependency</span>&gt;</span>
+ <span class="hljs-tag">&lt;<span 
class="hljs-title">groupId</span>&gt;</span>org.scala-lang<span 
class="hljs-tag">&lt;/<span class="hljs-title">groupId</span>&gt;</span>
+ <span class="hljs-tag">&lt;<span 
class="hljs-title">artifactId</span>&gt;</span>scala-library<span 
class="hljs-tag">&lt;/<span class="hljs-title">artifactId</span>&gt;</span>
+ <span class="hljs-tag">&lt;<span 
class="hljs-title">version</span>&gt;</span>2.11.2<span 
class="hljs-tag">&lt;/<span class="hljs-title">version</span>&gt;</span>
 <span class="hljs-tag">&lt;/<span 
class="hljs-title">dependency</span>&gt;</span></code></pre>
 <p>We are now set to write a Scala application.</p>
 <h2 id="write-your-scala-word-count-application">Write your Scala word count 
application</h2>
@@ -476,78 +476,78 @@ Add the Scala library.</p>
 <em>readEntity</em> : Reads a line from a file.<br />
 <em>emit</em> : Emits data read on the output port.<br />
 We have overridden openFile to obtain an instance of BufferedReader that is 
required while reading lines from the file. We also override closeFile for 
closing an instance of BufferedReader.</p>
-<pre class="prettyprint"><code class="language-scala hljs "><span 
class="hljs-class"><span class="hljs-keyword">class</span> <span 
class="hljs-title">LineReader</span> <span class="hljs-keyword">extends</span> 
<span class="hljs-title">AbstractFileInputOperator</span>[<span 
class="hljs-title">String</span>] {</span>
-
-  <span class="hljs-annotation">@transient</span>
-  <span class="hljs-keyword">val</span> out : DefaultOutputPort[String] = 
<span class="hljs-keyword">new</span> DefaultOutputPort[String]();
-
-  <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> readEntity(): String = br.readLine()
-
-  <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> emit(line: String): Unit = out.emit(line)
-
-  <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> openFile(path: Path): InputStream = {
-    <span class="hljs-keyword">val</span> in = <span 
class="hljs-keyword">super</span>.openFile(path)
-    br = <span class="hljs-keyword">new</span> BufferedReader(<span 
class="hljs-keyword">new</span> InputStreamReader(in))
-    <span class="hljs-keyword">return</span> in
-  }
-
-  <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> closeFile(is: InputStream): Unit = {
-    br.close()
-    <span class="hljs-keyword">super</span>.closeFile(is)
-  }
-
-  <span class="hljs-annotation">@transient</span>
-  <span class="hljs-keyword">private</span> <span 
class="hljs-keyword">var</span> br : BufferedReader = <span 
class="hljs-keyword">null</span>
+<pre class="prettyprint"><code class="language-scala hljs "><span 
class="hljs-class"><span class="hljs-keyword">class</span> <span 
class="hljs-title">LineReader</span> <span class="hljs-keyword">extends</span> 
<span class="hljs-title">AbstractFileInputOperator</span>[<span 
class="hljs-title">String</span>] {</span>
+
+  <span class="hljs-annotation">@transient</span>
+  <span class="hljs-keyword">val</span> out : DefaultOutputPort[String] = 
<span class="hljs-keyword">new</span> DefaultOutputPort[String]();
+
+  <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> readEntity(): String = br.readLine()
+
+  <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> emit(line: String): Unit = out.emit(line)
+
+  <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> openFile(path: Path): InputStream = {
+    <span class="hljs-keyword">val</span> in = <span 
class="hljs-keyword">super</span>.openFile(path)
+    br = <span class="hljs-keyword">new</span> BufferedReader(<span 
class="hljs-keyword">new</span> InputStreamReader(in))
+    <span class="hljs-keyword">return</span> in
+  }
+
+  <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> closeFile(is: InputStream): Unit = {
+    br.close()
+    <span class="hljs-keyword">super</span>.closeFile(is)
+  }
+
+  <span class="hljs-annotation">@transient</span>
+  <span class="hljs-keyword">private</span> <span 
class="hljs-keyword">var</span> br : BufferedReader = <span 
class="hljs-keyword">null</span>
 }</code></pre>
 <p>Some Apex API classes are not serializable, and must be defined as 
transient. Scala supports transient annotation for such scenarios. If you see 
objects that are not a part of the state of the operator, you must annotate 
them with a @transient. For example, in this code, we have annotated buffer 
reader and output port as transient.</p>
 <h3 id="parser">Parser</h3>
 <p>Parser splits lines using in-built JAVA split function, and emits 
individual words to the output port.</p>
-<pre class="prettyprint"><code class="language-scala hljs "><span 
class="hljs-class"><span class="hljs-keyword">class</span> <span 
class="hljs-title">Parser</span> <span class="hljs-keyword">extends</span> 
<span class="hljs-title">BaseOperator</span> {</span>
-  <span class="hljs-annotation">@BeanProperty</span>
-  <span class="hljs-keyword">var</span> regex : String = <span 
class="hljs-string">" "</span>
-
-  <span class="hljs-annotation">@transient</span>
-  <span class="hljs-keyword">val</span> out = <span 
class="hljs-keyword">new</span> DefaultOutputPort[String]()
-
-  <span class="hljs-annotation">@transient</span>
-  <span class="hljs-keyword">val</span> in = <span 
class="hljs-keyword">new</span> DefaultInputPort[String]() {
-    <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> process(t: String): Unit = {
-      <span class="hljs-keyword">for</span>(w &lt;- t.split(regex)) out.emit(w)
-    }
-  }
+<pre class="prettyprint"><code class="language-scala hljs "><span 
class="hljs-class"><span class="hljs-keyword">class</span> <span 
class="hljs-title">Parser</span> <span class="hljs-keyword">extends</span> 
<span class="hljs-title">BaseOperator</span> {</span>
+  <span class="hljs-annotation">@BeanProperty</span>
+  <span class="hljs-keyword">var</span> regex : String = <span 
class="hljs-string">" "</span>
+
+  <span class="hljs-annotation">@transient</span>
+  <span class="hljs-keyword">val</span> out = <span 
class="hljs-keyword">new</span> DefaultOutputPort[String]()
+
+  <span class="hljs-annotation">@transient</span>
+  <span class="hljs-keyword">val</span> in = <span 
class="hljs-keyword">new</span> DefaultInputPort[String]() {
+    <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> process(t: String): Unit = {
+      <span class="hljs-keyword">for</span>(w &lt;- t.split(regex)) out.emit(w)
+    }
+  }
 }</code></pre>
 <p>Scala simplifies automatic generation of setters and getters based on the 
@BinProperty annotation. Properties annotated with @BinProperty can be modified 
at the time of launching an application by using configuration files. You can 
also modify such properties while an application is running. You can specify 
the regular expression used for splitting within the configuration file.</p>
 <h3 id="uniquecount-and-consoeloutputoperator">UniqueCount and 
ConsoelOutputOperator</h3>
 <p>For this application, let us use UniqueCount and ConsoleOutputOperator as 
is.</p>
 <h3 id="put-together-the-word-count-application">Put together the word count 
application</h3>
 <p>Writing the main application class in Scala is similar to doing it in JAVA. 
You must first get an instance of DAG object by overriding the populateDAG() 
method. Later, you must add operators to this instance using the addOperator() 
method. Finally, you must connect the operators with the addStream() method.</p>
-<pre class="prettyprint"><code class="language-scala hljs "><span 
class="hljs-annotation">@ApplicationAnnotation</span>(name=<span 
class="hljs-string">"WordCount"</span>)
-<span class="hljs-class"><span class="hljs-keyword">class</span> <span 
class="hljs-title">Application</span> <span class="hljs-keyword">extends</span> 
<span class="hljs-title">StreamingApplication</span> {</span>
-  <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> populateDAG(dag: DAG, configuration: 
Configuration): Unit = {
-    <span class="hljs-keyword">val</span> input = dag.addOperator(<span 
class="hljs-string">"input"</span>, <span class="hljs-keyword">new</span> 
LineReader)
-    <span class="hljs-keyword">val</span> parser = dag.addOperator(<span 
class="hljs-string">"parser"</span>, <span class="hljs-keyword">new</span> 
Parser)
-    <span class="hljs-keyword">val</span> counter = dag.addOperator(<span 
class="hljs-string">"counter"</span>, <span class="hljs-keyword">new</span> 
UniqueCounter[String])
-    <span class="hljs-keyword">val</span> out = dag.addOperator(<span 
class="hljs-string">"console"</span>, <span class="hljs-keyword">new</span> 
ConsoleOutputOperator)
-
-    dag.addStream[String](<span class="hljs-string">"lines"</span>, input.out, 
parser.in)
-    dag.addStream[String](<span class="hljs-string">"words"</span>, 
parser.out, counter.data)
-    dag.addStream[java.util.HashMap[String,Integer]](<span 
class="hljs-string">"counts"</span>, counter.count, out.input)
-  }
+<pre class="prettyprint"><code class="language-scala hljs "><span 
class="hljs-annotation">@ApplicationAnnotation</span>(name=<span 
class="hljs-string">"WordCount"</span>)
+<span class="hljs-class"><span class="hljs-keyword">class</span> <span 
class="hljs-title">Application</span> <span class="hljs-keyword">extends</span> 
<span class="hljs-title">StreamingApplication</span> {</span>
+  <span class="hljs-keyword">override</span> <span 
class="hljs-keyword">def</span> populateDAG(dag: DAG, configuration: 
Configuration): Unit = {
+    <span class="hljs-keyword">val</span> input = dag.addOperator(<span 
class="hljs-string">"input"</span>, <span class="hljs-keyword">new</span> 
LineReader)
+    <span class="hljs-keyword">val</span> parser = dag.addOperator(<span 
class="hljs-string">"parser"</span>, <span class="hljs-keyword">new</span> 
Parser)
+    <span class="hljs-keyword">val</span> counter = dag.addOperator(<span 
class="hljs-string">"counter"</span>, <span class="hljs-keyword">new</span> 
UniqueCounter[String])
+    <span class="hljs-keyword">val</span> out = dag.addOperator(<span 
class="hljs-string">"console"</span>, <span class="hljs-keyword">new</span> 
ConsoleOutputOperator)
+
+    dag.addStream[String](<span class="hljs-string">"lines"</span>, input.out, 
parser.in)
+    dag.addStream[String](<span class="hljs-string">"words"</span>, 
parser.out, counter.data)
+    dag.addStream[java.util.HashMap[String,Integer]](<span 
class="hljs-string">"counts"</span>, counter.count, out.input)
+  }
 }</code></pre>
 <h2 id="running-application">Running application</h2>
 <p>Before running the word count application, specify the input directory for 
the input operator. You can use the default configuration file for this. Open 
the <em>src/main/resources/META-INF/properties.xml</em> file, and add the 
following lines between the tag. Do not forget to replace 
“username” with your Hadoop username.</p>
-<pre class="prettyprint"><code class="language-xml hljs "><span 
class="hljs-tag">&lt;<span class="hljs-title">property</span>&gt;</span>
- <span class="hljs-tag">&lt;<span 
class="hljs-title">name</span>&gt;</span>dt.application.WordCount.operator.input.prop.directory<span
 class="hljs-tag">&lt;/<span class="hljs-title">name</span>&gt;</span>
-  <span class="hljs-tag">&lt;<span 
class="hljs-title">value</span>&gt;</span>/user/username/data<span 
class="hljs-tag">&lt;/<span class="hljs-title">value</span>&gt;</span>
+<pre class="prettyprint"><code class="language-xml hljs "><span 
class="hljs-tag">&lt;<span class="hljs-title">property</span>&gt;</span>
+ <span class="hljs-tag">&lt;<span 
class="hljs-title">name</span>&gt;</span>dt.application.WordCount.operator.input.prop.directory<span
 class="hljs-tag">&lt;/<span class="hljs-title">name</span>&gt;</span>
+  <span class="hljs-tag">&lt;<span 
class="hljs-title">value</span>&gt;</span>/user/username/data<span 
class="hljs-tag">&lt;/<span class="hljs-title">value</span>&gt;</span>
 <span class="hljs-tag">&lt;/<span 
class="hljs-title">property</span>&gt;</span></code></pre>
 <p>Build the application from the application directory using this command:</p>
 <pre class="prettyprint"><code class="language-bash hljs ">mvn clean install 
-DskipTests</code></pre>
 <p>You should now have an application package in the target directory.</p>
 <p>Now, launch this application package using dtcli.</p>
-<pre class="prettyprint"><code class="language-bash hljs ">$ dtcli
-DT CLI <span class="hljs-number">3.2</span>.<span 
class="hljs-number">0</span>-SNAPSHOT <span 
class="hljs-number">28.09</span>.<span class="hljs-number">2015</span> @ <span 
class="hljs-number">12</span>:<span class="hljs-number">45</span>:<span 
class="hljs-number">15</span> IST rev: <span class="hljs-number">8</span>e49cfb 
branch: devel-<span class="hljs-number">3</span>
-dt&gt; launch target/wordcount-<span 
class="hljs-number">1.0</span>-SNAPSHOT.apa
-{<span class="hljs-string">"appId"</span>: <span 
class="hljs-string">"application_1443354392775_0010"</span>}
+<pre class="prettyprint"><code class="language-bash hljs ">$ dtcli
+DT CLI <span class="hljs-number">3.2</span>.<span 
class="hljs-number">0</span>-SNAPSHOT <span 
class="hljs-number">28.09</span>.<span class="hljs-number">2015</span> @ <span 
class="hljs-number">12</span>:<span class="hljs-number">45</span>:<span 
class="hljs-number">15</span> IST rev: <span class="hljs-number">8</span>e49cfb 
branch: devel-<span class="hljs-number">3</span>
+dt&gt; launch target/wordcount-<span 
class="hljs-number">1.0</span>-SNAPSHOT.apa
+{<span class="hljs-string">"appId"</span>: <span 
class="hljs-string">"application_1443354392775_0010"</span>}
 dt (application_1443354392775_0010) &gt;</code></pre>
 <p>Add some text files to the <em>/user/username/data</em> directory on your 
HDFS to see how the application works. You can see the words along with their 
counts in the container log of the console operator.</p>
 <h2 id="summary">Summary</h2>
@@ -651,43 +651,43 @@ To get started with creating your first application, see 
<a href="https://www.da
 <li>Basic health checks of the cluster</li>
 </ul>
 <p>Here is an example of using the curl command to access dtGateway’s REST 
API to get the details of a physical operator with ID=40 of application 
instance with ID=application_1442448722264_14891, assuming dtGateway is 
listening at localhost:9090:</p>
-<pre class="prettyprint"><code class="language-bash hljs ">$ curl 
http://localhost:<span 
class="hljs-number">9090</span>/ws/v2/applications/application_1442448722264_14891/physicalPlan/operators/<span
 class="hljs-number">40</span>
-{
-    <span class="hljs-string">"checkpointStartTime"</span>: <span 
class="hljs-string">"1442512091772"</span>, 
-    <span class="hljs-string">"checkpointTime"</span>: <span 
class="hljs-string">"175"</span>, 
-    <span class="hljs-string">"checkpointTimeMA"</span>: <span 
class="hljs-string">"164"</span>, 
-    <span class="hljs-string">"className"</span>: <span 
class="hljs-string">"com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator"</span>,
 
-    <span class="hljs-string">"container"</span>: <span 
class="hljs-string">"container_e08_1442448722264_14891_01_000017"</span>, 
-    <span class="hljs-string">"counters"</span>: null, 
-    <span class="hljs-string">"cpuPercentageMA"</span>: <span 
class="hljs-string">"0.2039266316727741"</span>, 
-    <span class="hljs-string">"currentWindowId"</span>: <span 
class="hljs-string">"6195527785184762469"</span>, 
-    <span class="hljs-string">"failureCount"</span>: <span 
class="hljs-string">"0"</span>, 
-    <span class="hljs-string">"host"</span>: <span 
class="hljs-string">"node22.morado.com:8041"</span>, 
-    <span class="hljs-string">"id"</span>: <span 
class="hljs-string">"40"</span>, 
-    <span class="hljs-string">"lastHeartbeat"</span>: <span 
class="hljs-string">"1442512100742"</span>, 
-    <span class="hljs-string">"latencyMA"</span>: <span 
class="hljs-string">"5"</span>, 
-    <span class="hljs-string">"logicalName"</span>: <span 
class="hljs-string">"QueryResult"</span>, 
-    <span class="hljs-string">"metrics"</span>: {}, 
-    <span class="hljs-string">"name"</span>: <span 
class="hljs-string">"QueryResult"</span>, 
-    <span class="hljs-string">"ports"</span>: [
-        {
-            <span class="hljs-string">"bufferServerBytesPSMA"</span>: <span 
class="hljs-string">"0"</span>, 
-            <span class="hljs-string">"name"</span>: <span 
class="hljs-string">"inputPort"</span>, 
-            <span class="hljs-string">"queueSizeMA"</span>: <span 
class="hljs-string">"1"</span>, 
-            <span class="hljs-string">"recordingId"</span>: null, 
-            <span class="hljs-string">"totalTuples"</span>: <span 
class="hljs-string">"6976"</span>, 
-            <span class="hljs-string">"tuplesPSMA"</span>: <span 
class="hljs-string">"0"</span>, 
-            <span class="hljs-string">"type"</span>: <span 
class="hljs-string">"input"</span>
-        }
-    ], 
-    <span class="hljs-string">"recordingId"</span>: null, 
-    <span class="hljs-string">"recoveryWindowId"</span>: <span 
class="hljs-string">"6195527785184762451"</span>, 
-    <span class="hljs-string">"status"</span>: <span 
class="hljs-string">"ACTIVE"</span>, 
-    <span class="hljs-string">"totalTuplesEmitted"</span>: <span 
class="hljs-string">"0"</span>, 
-    <span class="hljs-string">"totalTuplesProcessed"</span>: <span 
class="hljs-string">"6976"</span>, 
-    <span class="hljs-string">"tuplesEmittedPSMA"</span>: <span 
class="hljs-string">"0"</span>, 
-    <span class="hljs-string">"tuplesProcessedPSMA"</span>: <span 
class="hljs-string">"20"</span>, 
-    <span class="hljs-string">"unifierClass"</span>: null
+<pre class="prettyprint"><code class="language-bash hljs ">$ curl 
http://localhost:<span 
class="hljs-number">9090</span>/ws/v2/applications/application_1442448722264_14891/physicalPlan/operators/<span
 class="hljs-number">40</span>
+{
+    <span class="hljs-string">"checkpointStartTime"</span>: <span 
class="hljs-string">"1442512091772"</span>, 
+    <span class="hljs-string">"checkpointTime"</span>: <span 
class="hljs-string">"175"</span>, 
+    <span class="hljs-string">"checkpointTimeMA"</span>: <span 
class="hljs-string">"164"</span>, 
+    <span class="hljs-string">"className"</span>: <span 
class="hljs-string">"com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator"</span>,
 
+    <span class="hljs-string">"container"</span>: <span 
class="hljs-string">"container_e08_1442448722264_14891_01_000017"</span>, 
+    <span class="hljs-string">"counters"</span>: null, 
+    <span class="hljs-string">"cpuPercentageMA"</span>: <span 
class="hljs-string">"0.2039266316727741"</span>, 
+    <span class="hljs-string">"currentWindowId"</span>: <span 
class="hljs-string">"6195527785184762469"</span>, 
+    <span class="hljs-string">"failureCount"</span>: <span 
class="hljs-string">"0"</span>, 
+    <span class="hljs-string">"host"</span>: <span 
class="hljs-string">"node22.morado.com:8041"</span>, 
+    <span class="hljs-string">"id"</span>: <span 
class="hljs-string">"40"</span>, 
+    <span class="hljs-string">"lastHeartbeat"</span>: <span 
class="hljs-string">"1442512100742"</span>, 
+    <span class="hljs-string">"latencyMA"</span>: <span 
class="hljs-string">"5"</span>, 
+    <span class="hljs-string">"logicalName"</span>: <span 
class="hljs-string">"QueryResult"</span>, 
+    <span class="hljs-string">"metrics"</span>: {}, 
+    <span class="hljs-string">"name"</span>: <span 
class="hljs-string">"QueryResult"</span>, 
+    <span class="hljs-string">"ports"</span>: [
+        {
+            <span class="hljs-string">"bufferServerBytesPSMA"</span>: <span 
class="hljs-string">"0"</span>, 
+            <span class="hljs-string">"name"</span>: <span 
class="hljs-string">"inputPort"</span>, 
+            <span class="hljs-string">"queueSizeMA"</span>: <span 
class="hljs-string">"1"</span>, 
+            <span class="hljs-string">"recordingId"</span>: null, 
+            <span class="hljs-string">"totalTuples"</span>: <span 
class="hljs-string">"6976"</span>, 
+            <span class="hljs-string">"tuplesPSMA"</span>: <span 
class="hljs-string">"0"</span>, 
+            <span class="hljs-string">"type"</span>: <span 
class="hljs-string">"input"</span>
+        }
+    ], 
+    <span class="hljs-string">"recordingId"</span>: null, 
+    <span class="hljs-string">"recoveryWindowId"</span>: <span 
class="hljs-string">"6195527785184762451"</span>, 
+    <span class="hljs-string">"status"</span>: <span 
class="hljs-string">"ACTIVE"</span>, 
+    <span class="hljs-string">"totalTuplesEmitted"</span>: <span 
class="hljs-string">"0"</span>, 
+    <span class="hljs-string">"totalTuplesProcessed"</span>: <span 
class="hljs-string">"6976"</span>, 
+    <span class="hljs-string">"tuplesEmittedPSMA"</span>: <span 
class="hljs-string">"0"</span>, 
+    <span class="hljs-string">"tuplesProcessedPSMA"</span>: <span 
class="hljs-string">"20"</span>, 
+    <span class="hljs-string">"unifierClass"</span>: null
 }</code></pre>
 <p>For the complete spec of the REST API, please refer to our dtGateway REST 
API documentation <a 
href="https://www.datatorrent.com/docs/guides/DTGatewayAPISpecification.html"; 
target="_blank">here</a>.</p>
 <p>With great power comes great responsibility. With all the information 
dtGateway has and what dtGateway can do, the admin of DataTorrent RTS may want 
to restrict access to certain information and operations to only certain group 
of users. This means dtGateway must support authentication and 
authorization.</p>
@@ -735,77 +735,77 @@ In addition, we provide access control with granularity 
to the application insta
 <p>Let’s consider the example of the WordCount application, which is the 
de-facto hello world application of Hadoop. Here is how this simple, sequential 
DAG will look: The input operator reads a file to emit lines. The “lines” 
act as a stream, which in turn becomes the input for the parser operator. The 
parser operator performs a parse operation to generate words for the counter 
operator. The counter operator emits tuples (word, count) to the console. </p>
 <p><img 
src="https://www.datatorrent.com/wp-content/uploads/2015/10/wordcount-dag1.png"; 
alt="WordCount DAG" title=""></p>
 <p>The source for the logical plan can be in different formats. Using the Apex 
Java API, the WordCount example could look like this:</p>
-<pre class="prettyprint"><code class="language-java hljs "><span 
class="hljs-annotation">@ApplicationAnnotation</span>(name=<span 
class="hljs-string">"MyFirstApplication"</span>)
-<span class="hljs-keyword">public</span> <span class="hljs-class"><span 
class="hljs-keyword">class</span> <span class="hljs-title">Application</span> 
<span class="hljs-keyword">implements</span> <span 
class="hljs-title">StreamingApplication</span>
-{</span>
-  <span class="hljs-annotation">@Override</span>
-  <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">void</span> <span 
class="hljs-title">populateDAG</span>(DAG dag, Configuration conf)
-  {
-    LineReader lineReader = dag.addOperator(<span 
class="hljs-string">"input"</span>, <span class="hljs-keyword">new</span> 
LineReader());
-    Parser parser = dag.addOperator(<span class="hljs-string">"parser"</span>, 
<span class="hljs-keyword">new</span> Parser());
-    UniqueCounter&lt;String&gt; counter = dag.addOperator(<span 
class="hljs-string">"counter"</span>, <span class="hljs-keyword">new</span> 
UniqueCounter&lt;String&gt;());
-    ConsoleOutputOperator cons = dag.addOperator(<span 
class="hljs-string">"console"</span>, <span class="hljs-keyword">new</span> 
ConsoleOutputOperator());
-    dag.addStream(<span class="hljs-string">"lines"</span>, lineReader.output, 
parser.input);
-    dag.addStream(<span class="hljs-string">"words"</span>, parser.output, 
counter.data);
-    dag.addStream(<span class="hljs-string">"counts"</span>, counter.count, 
cons.input);
-  }
+<pre class="prettyprint"><code class="language-java hljs "><span 
class="hljs-annotation">@ApplicationAnnotation</span>(name=<span 
class="hljs-string">"MyFirstApplication"</span>)
+<span class="hljs-keyword">public</span> <span class="hljs-class"><span 
class="hljs-keyword">class</span> <span class="hljs-title">Application</span> 
<span class="hljs-keyword">implements</span> <span 
class="hljs-title">StreamingApplication</span>
+{</span>
+  <span class="hljs-annotation">@Override</span>
+  <span class="hljs-keyword">public</span> <span 
class="hljs-keyword">void</span> <span 
class="hljs-title">populateDAG</span>(DAG dag, Configuration conf)
+  {
+    LineReader lineReader = dag.addOperator(<span 
class="hljs-string">"input"</span>, <span class="hljs-keyword">new</span> 
LineReader());
+    Parser parser = dag.addOperator(<span class="hljs-string">"parser"</span>, 
<span class="hljs-keyword">new</span> Parser());
+    UniqueCounter&lt;String&gt; counter = dag.addOperator(<span 
class="hljs-string">"counter"</span>, <span class="hljs-keyword">new</span> 
UniqueCounter&lt;String&gt;());
+    ConsoleOutputOperator cons = dag.addOperator(<span 
class="hljs-string">"console"</span>, <span class="hljs-keyword">new</span> 
ConsoleOutputOperator());
+    dag.addStream(<span class="hljs-string">"lines"</span>, lineReader.output, 
parser.input);
+    dag.addStream(<span class="hljs-string">"words"</span>, parser.output, 
counter.data);
+    dag.addStream(<span class="hljs-string">"counts"</span>, counter.count, 
cons.input);
+  }
 }</code></pre>
 <p>The same WordCount application can be specified through JSON format 
(typically generated by a tool, such as the DataTorrent RTS application builder 
known as dtAssemble):</p>
-<pre class="prettyprint"><code class="language-json hljs ">{
-  "<span class="hljs-attribute">displayName</span>": <span 
class="hljs-value"><span class="hljs-string">"WordCountJSON"</span></span>,
-  "<span class="hljs-attribute">operators</span>": <span class="hljs-value">[
-    {
-      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"input"</span></span>,
-      ...
-    },
-    {
-      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"parse"</span></span>,
-      ...
-    },
-    {
-      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"count"</span></span>,
-      "<span class="hljs-attribute">class</span>": <span 
class="hljs-value"><span 
class="hljs-string">"com.datatorrent.lib.algo.UniqueCounter"</span></span>,
-      "<span class="hljs-attribute">properties</span>": <span 
class="hljs-value">{
-        "<span 
class="hljs-attribute">com.datatorrent.lib.algo.UniqueCounter</span>": <span 
class="hljs-value">{
-          "<span class="hljs-attribute">cumulative</span>": <span 
class="hljs-value"><span class="hljs-literal">false</span>
-        </span>}
-      </span>}
-    </span>},
-    {
-      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"console"</span></span>,
-      ...
-    }
-  ]</span>,
-  "<span class="hljs-attribute">streams</span>": <span class="hljs-value">[
-    {
-      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"lines"</span></span>,
-      "<span class="hljs-attribute">sinks</span>": <span class="hljs-value">[
-        {
-          "<span class="hljs-attribute">operatorName</span>": <span 
class="hljs-value"><span class="hljs-string">"parse"</span></span>,
-          "<span class="hljs-attribute">portName</span>": <span 
class="hljs-value"><span class="hljs-string">"input"</span>
-        </span>}
-      ]</span>,
-      "<span class="hljs-attribute">source</span>": <span class="hljs-value">{
-        "<span class="hljs-attribute">operatorName</span>": <span 
class="hljs-value"><span class="hljs-string">"input"</span></span>,
-        "<span class="hljs-attribute">portName</span>": <span 
class="hljs-value"><span class="hljs-string">"output"</span>
-      </span>}
-    </span>},
-    {
-      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"words"</span></span>,
-      ...
-    },
-    {
-      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"counts"</span></span>,
-      ...
-    }
-  ]
+<pre class="prettyprint"><code class="language-json hljs ">{
+  "<span class="hljs-attribute">displayName</span>": <span 
class="hljs-value"><span class="hljs-string">"WordCountJSON"</span></span>,
+  "<span class="hljs-attribute">operators</span>": <span class="hljs-value">[
+    {
+      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"input"</span></span>,
+      ...
+    },
+    {
+      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"parse"</span></span>,
+      ...
+    },
+    {
+      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"count"</span></span>,
+      "<span class="hljs-attribute">class</span>": <span 
class="hljs-value"><span 
class="hljs-string">"com.datatorrent.lib.algo.UniqueCounter"</span></span>,
+      "<span class="hljs-attribute">properties</span>": <span 
class="hljs-value">{
+        "<span 
class="hljs-attribute">com.datatorrent.lib.algo.UniqueCounter</span>": <span 
class="hljs-value">{
+          "<span class="hljs-attribute">cumulative</span>": <span 
class="hljs-value"><span class="hljs-literal">false</span>
+        </span>}
+      </span>}
+    </span>},
+    {
+      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"console"</span></span>,
+      ...
+    }
+  ]</span>,
+  "<span class="hljs-attribute">streams</span>": <span class="hljs-value">[
+    {
+      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"lines"</span></span>,
+      "<span class="hljs-attribute">sinks</span>": <span class="hljs-value">[
+        {
+          "<span class="hljs-attribute">operatorName</span>": <span 
class="hljs-value"><span class="hljs-string">"parse"</span></span>,
+          "<span class="hljs-attribute">portName</span>": <span 
class="hljs-value"><span class="hljs-string">"input"</span>
+        </span>}
+      ]</span>,
+      "<span class="hljs-attribute">source</span>": <span class="hljs-value">{
+        "<span class="hljs-attribute">operatorName</span>": <span 
class="hljs-value"><span class="hljs-string">"input"</span></span>,
+        "<span class="hljs-attribute">portName</span>": <span 
class="hljs-value"><span class="hljs-string">"output"</span>
+      </span>}
+    </span>},
+    {
+      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"words"</span></span>,
+      ...
+    },
+    {
+      "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"counts"</span></span>,
+      ...
+    }
+  ]
 </span>}</code></pre>
 <p>As mentioned previously, the DAG can also be modified after an application 
was launched. In the following example we add another console operator to 
display the lines emitted by the input operator: </p>
-<pre class="prettyprint"><code class="language-bash hljs ">Connected to 
application application_1442901180806_0001
-dt (application_1442901180806_0001) &gt; begin-logical-plan-change 
-logical-plan-change (application_1442901180806_0001) &gt; create-operator 
linesConsole com.datatorrent.lib.io.ConsoleOutputOperator
-logical-plan-change (application_1442901180806_0001) &gt; add-stream-sink 
lines linesConsole input
-logical-plan-change (application_1442901180806_0001) &gt; submit 
+<pre class="prettyprint"><code class="language-bash hljs ">Connected to 
application application_1442901180806_0001
+dt (application_1442901180806_0001) &gt; begin-logical-plan-change 
+logical-plan-change (application_1442901180806_0001) &gt; create-operator 
linesConsole com.datatorrent.lib.io.ConsoleOutputOperator
+logical-plan-change (application_1442901180806_0001) &gt; add-stream-sink 
lines linesConsole input
+logical-plan-change (application_1442901180806_0001) &gt; submit 
 {}</code></pre>
 <p><img 
src="https://www.datatorrent.com/wp-content/uploads/2015/10/wordcount-dag2.png"; 
alt="Altered WordCount DAG" title=""></p>
 <h3 id="translation-of-logical-dag-into-physical-plan">Translation of logical 
DAG into physical plan</h3>
@@ -838,51 +838,51 @@ logical-plan-change (application_1442901180806_0001) &gt; 
submit
 <p>There are a few other things that happen between invocations of the user 
code, demarcated by windows. For example, checkpoints are periodically taken 
(every 30s by default, tunable by the user). There are also optional callbacks 
defined by <code>CheckpointListener</code> that can be used to implement 
synchronization with external systems (think database transactions or copy of 
finalized files, for example).</p>
 <h3 id="monitoring-the-execution">Monitoring the execution</h3>
 <p>Once the containers are fully provisioned, StrAM records the periodic 
heartbeat updates, and watches operator processing as data flows through the 
pipeline. StrAM does not contribute to the data flow itself, processing is 
decentralized and asynchronous. StrAM collects the stats from the heartbeats 
and uses them to provide the central view of the execution. For example, it 
calculates latency based on the window timestamps that are reported, which is 
vital in identifying processing bottlenecks. It also uses the window 
information to monitor the progress of operators and identify operators that 
are stuck (and when necessary restarts them, with an interval controllable by 
user). StrAM also hosts a REST API that clients such as the CLI can use to 
collect data. Here is an example for the information that can be obtained 
through this API:</p>
-<pre class="prettyprint"><code class="language-json hljs ">  {
-    "<span class="hljs-attribute">id</span>": <span class="hljs-value"><span 
class="hljs-string">"3"</span></span>,
-    "<span class="hljs-attribute">name</span>": <span class="hljs-value"><span 
class="hljs-string">"counter"</span></span>,
-    "<span class="hljs-attribute">className</span>": <span 
class="hljs-value"><span 
class="hljs-string">"com.datatorrent.lib.algo.UniqueCounter"</span></span>,
-    "<span class="hljs-attribute">container</span>": <span 
class="hljs-value"><span 
class="hljs-string">"container_1443668714920_0001_01_000003"</span></span>,
-    "<span class="hljs-attribute">host</span>": <span class="hljs-value"><span 
class="hljs-string">"localhost:8052"</span></span>,
-    "<span class="hljs-attribute">totalTuplesProcessed</span>": <span 
class="hljs-value"><span class="hljs-string">"198"</span></span>,
-    "<span class="hljs-attribute">totalTuplesEmitted</span>": <span 
class="hljs-value"><span class="hljs-string">"1"</span></span>,
-    "<span class="hljs-attribute">tuplesProcessedPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
-    "<span class="hljs-attribute">tuplesEmittedPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
-    "<span class="hljs-attribute">cpuPercentageMA</span>": <span 
class="hljs-value"><span class="hljs-string">"1.5208279931258353"</span></span>,
-    "<span class="hljs-attribute">latencyMA</span>": <span 
class="hljs-value"><span class="hljs-string">"10"</span></span>,
-    "<span class="hljs-attribute">status</span>": <span 
class="hljs-value"><span class="hljs-string">"ACTIVE"</span></span>,
-    "<span class="hljs-attribute">lastHeartbeat</span>": <span 
class="hljs-value"><span class="hljs-string">"1443670671506"</span></span>,
-    "<span class="hljs-attribute">failureCount</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
-    "<span class="hljs-attribute">recoveryWindowId</span>": <span 
class="hljs-value"><span 
class="hljs-string">"6200516265145009027"</span></span>,
-    "<span class="hljs-attribute">currentWindowId</span>": <span 
class="hljs-value"><span 
class="hljs-string">"6200516265145009085"</span></span>,
-    "<span class="hljs-attribute">ports</span>": <span class="hljs-value">[
-      {
-        "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"data"</span></span>,
-        "<span class="hljs-attribute">type</span>": <span 
class="hljs-value"><span class="hljs-string">"input"</span></span>,
-        "<span class="hljs-attribute">totalTuples</span>": <span 
class="hljs-value"><span class="hljs-string">"198"</span></span>,
-        "<span class="hljs-attribute">tuplesPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
-        "<span class="hljs-attribute">bufferServerBytesPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"16"</span></span>,
-        "<span class="hljs-attribute">queueSizeMA</span>": <span 
class="hljs-value"><span class="hljs-string">"1"</span></span>,
-        "<span class="hljs-attribute">recordingId</span>": <span 
class="hljs-value"><span class="hljs-literal">null</span>
-      </span>},
-      {
-        "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"count"</span></span>,
-        "<span class="hljs-attribute">type</span>": <span 
class="hljs-value"><span class="hljs-string">"output"</span></span>,
-        "<span class="hljs-attribute">totalTuples</span>": <span 
class="hljs-value"><span class="hljs-string">"1"</span></span>,
-        "<span class="hljs-attribute">tuplesPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
-        "<span class="hljs-attribute">bufferServerBytesPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"12"</span></span>,
-        "<span class="hljs-attribute">queueSizeMA</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
-        "<span class="hljs-attribute">recordingId</span>": <span 
class="hljs-value"><span class="hljs-literal">null</span>
-      </span>}
-    ]</span>,
-    "<span class="hljs-attribute">unifierClass</span>": <span 
class="hljs-value"><span class="hljs-literal">null</span></span>,
-    "<span class="hljs-attribute">logicalName</span>": <span 
class="hljs-value"><span class="hljs-string">"counter"</span></span>,
-    "<span class="hljs-attribute">recordingId</span>": <span 
class="hljs-value"><span class="hljs-literal">null</span></span>,
-    "<span class="hljs-attribute">counters</span>": <span 
class="hljs-value"><span class="hljs-literal">null</span></span>,
-    "<span class="hljs-attribute">metrics</span>": <span 
class="hljs-value">{}</span>,
-    "<span class="hljs-attribute">checkpointStartTime</span>": <span 
class="hljs-value"><span class="hljs-string">"1443670642472"</span></span>,
-    "<span class="hljs-attribute">checkpointTime</span>": <span 
class="hljs-value"><span class="hljs-string">"42"</span></span>,
-    "<span class="hljs-attribute">checkpointTimeMA</span>": <span 
class="hljs-value"><span class="hljs-string">"129"</span>
+<pre class="prettyprint"><code class="language-json hljs ">  {
+    "<span class="hljs-attribute">id</span>": <span class="hljs-value"><span 
class="hljs-string">"3"</span></span>,
+    "<span class="hljs-attribute">name</span>": <span class="hljs-value"><span 
class="hljs-string">"counter"</span></span>,
+    "<span class="hljs-attribute">className</span>": <span 
class="hljs-value"><span 
class="hljs-string">"com.datatorrent.lib.algo.UniqueCounter"</span></span>,
+    "<span class="hljs-attribute">container</span>": <span 
class="hljs-value"><span 
class="hljs-string">"container_1443668714920_0001_01_000003"</span></span>,
+    "<span class="hljs-attribute">host</span>": <span class="hljs-value"><span 
class="hljs-string">"localhost:8052"</span></span>,
+    "<span class="hljs-attribute">totalTuplesProcessed</span>": <span 
class="hljs-value"><span class="hljs-string">"198"</span></span>,
+    "<span class="hljs-attribute">totalTuplesEmitted</span>": <span 
class="hljs-value"><span class="hljs-string">"1"</span></span>,
+    "<span class="hljs-attribute">tuplesProcessedPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
+    "<span class="hljs-attribute">tuplesEmittedPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
+    "<span class="hljs-attribute">cpuPercentageMA</span>": <span 
class="hljs-value"><span class="hljs-string">"1.5208279931258353"</span></span>,
+    "<span class="hljs-attribute">latencyMA</span>": <span 
class="hljs-value"><span class="hljs-string">"10"</span></span>,
+    "<span class="hljs-attribute">status</span>": <span 
class="hljs-value"><span class="hljs-string">"ACTIVE"</span></span>,
+    "<span class="hljs-attribute">lastHeartbeat</span>": <span 
class="hljs-value"><span class="hljs-string">"1443670671506"</span></span>,
+    "<span class="hljs-attribute">failureCount</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
+    "<span class="hljs-attribute">recoveryWindowId</span>": <span 
class="hljs-value"><span 
class="hljs-string">"6200516265145009027"</span></span>,
+    "<span class="hljs-attribute">currentWindowId</span>": <span 
class="hljs-value"><span 
class="hljs-string">"6200516265145009085"</span></span>,
+    "<span class="hljs-attribute">ports</span>": <span class="hljs-value">[
+      {
+        "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"data"</span></span>,
+        "<span class="hljs-attribute">type</span>": <span 
class="hljs-value"><span class="hljs-string">"input"</span></span>,
+        "<span class="hljs-attribute">totalTuples</span>": <span 
class="hljs-value"><span class="hljs-string">"198"</span></span>,
+        "<span class="hljs-attribute">tuplesPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
+        "<span class="hljs-attribute">bufferServerBytesPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"16"</span></span>,
+        "<span class="hljs-attribute">queueSizeMA</span>": <span 
class="hljs-value"><span class="hljs-string">"1"</span></span>,
+        "<span class="hljs-attribute">recordingId</span>": <span 
class="hljs-value"><span class="hljs-literal">null</span>
+      </span>},
+      {
+        "<span class="hljs-attribute">name</span>": <span 
class="hljs-value"><span class="hljs-string">"count"</span></span>,
+        "<span class="hljs-attribute">type</span>": <span 
class="hljs-value"><span class="hljs-string">"output"</span></span>,
+        "<span class="hljs-attribute">totalTuples</span>": <span 
class="hljs-value"><span class="hljs-string">"1"</span></span>,
+        "<span class="hljs-attribute">tuplesPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
+        "<span class="hljs-attribute">bufferServerBytesPSMA</span>": <span 
class="hljs-value"><span class="hljs-string">"12"</span></span>,
+        "<span class="hljs-attribute">queueSizeMA</span>": <span 
class="hljs-value"><span class="hljs-string">"0"</span></span>,
+        "<span class="hljs-attribute">recordingId</span>": <span 
class="hljs-value"><span class="hljs-literal">null</span>
+      </span>}
+    ]</span>,
+    "<span class="hljs-attribute">unifierClass</span>": <span 
class="hljs-value"><span class="hljs-literal">null</span></span>,
+    "<span class="hljs-attribute">logicalName</span>": <span 
class="hljs-value"><span class="hljs-string">"counter"</span></span>,
+    "<span class="hljs-attribute">recordingId</span>": <span 
class="hljs-value"><span class="hljs-literal">null</span></span>,
+    "<span class="hljs-attribute">counters</span>": <span 
class="hljs-value"><span class="hljs-literal">null</span></span>,
+    "<span class="hljs-attribute">metrics</span>": <span 
class="hljs-value">{}</span>,
+    "<span class="hljs-attribute">checkpointStartTime</span>": <span 
class="hljs-value"><span class="hljs-string">"1443670642472"</span></span>,
+    "<span class="hljs-attribute">checkpointTime</span>": <span 
class="hljs-value"><span class="hljs-string">"42"</span></span>,
+    "<span class="hljs-attribute">checkpointTimeMA</span>": <span 
class="hljs-value"><span class="hljs-string">"129"</span>
   </span>}</code></pre>
 <p>This blog covered the lifecycle of a DAG. Future posts will cover the 
inside view of the Apex engine, including checkpointing, processing semantics, 
partitioning and more. Watch this space! </p>
 <p>The post <a rel="nofollow" 
href="https://www.datatorrent.com/blog-tracing-dags-from-specification-to-execution/";>Tracing
 DAGs from specification to execution</a> appeared first on <a rel="nofollow" 
href="https://www.datatorrent.com";>DataTorrent</a>.</p>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/24ffdb6e/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
----------------------------------------------------------------------
diff --git 
a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
 
b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
index 0d24638..d0b62ea 100644
--- 
a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
+++ 
b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
@@ -1,22 +1,22 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 /**
  * Iteration demonstration application.
  */
-package com.datatorrent.demos.iteration;
+package com.datatorrent.demos.iteration;

Reply via email to