http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/1052900a/docs/apex-3.2/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/docs/apex-3.2/mkdocs/search_index.json 
b/docs/apex-3.2/mkdocs/search_index.json
deleted file mode 100644
index 836f785..0000000
--- a/docs/apex-3.2/mkdocs/search_index.json
+++ /dev/null
@@ -1,1004 +0,0 @@
-{
-    "docs": [
-        {
-            "location": "/", 
-            "text": "Apache Apex (Incubating)\n\n\nApex is a Hadoop YARN 
native big data processing platform, enabling real time stream as well as batch 
processing for your big data.  Apex provides the following 
benefits:\n\n\n\n\nHigh scalability and performance\n\n\nFault tolerance and 
state management\n\n\nHadoop-native YARN \n HDFS implementation\n\n\nEvent 
processing guarantees\n\n\nSeparation of functional and operational 
concerns\n\n\nSimple API supports generic Java code\n\n\n\n\nPlatform has been 
demonstated to scale linearly across Hadoop clusters under extreme loads of 
billions of events per second.  Hardware and process failures are quickly 
recovered with HDFS-backed checkpointing and automatic operator recovery, 
preserving application state and resuming execution in seconds.  Functional and 
operational specifications are separated.  Apex provides a simple API, which 
enables users to write generic, reusable code.  The code is dropped in as-is 
and platform automatically h
 andles the various operational concerns, such as state management, fault 
tolerance, scalability, security, metrics, etc.  This frees users to focus on 
functional development, and lets platform provide operability support.\n\n\nThe 
core Apex platform is supplemented by Malhar, a library of connector and logic 
functions, enabling rapid application development.  These operators and modules 
provide access to HDFS, S3, NFS, FTP, and other file systems; Kafka, ActiveMQ, 
RabbitMQ, JMS, and other message systems; MySql, Cassandra, MongoDB, Redis, 
HBase, CouchDB, generic JDBC, and other database connectors.  In addition to 
the operators, the library contains a number of demos applications, 
demonstrating operator features and capabilities.  To see the full list of 
available operators and related documentation, visit \nApex Malhar on 
Github\n\n\nFor additional information visit \nApache Apex (incubating)\n.", 
-            "title": "Apache Apex"
-        }, 
-        {
-            "location": "/#apache-apex-incubating", 
-            "text": "Apex is a Hadoop YARN native big data processing 
platform, enabling real time stream as well as batch processing for your big 
data.  Apex provides the following benefits:   High scalability and performance 
 Fault tolerance and state management  Hadoop-native YARN   HDFS implementation 
 Event processing guarantees  Separation of functional and operational concerns 
 Simple API supports generic Java code   Platform has been demonstated to scale 
linearly across Hadoop clusters under extreme loads of billions of events per 
second.  Hardware and process failures are quickly recovered with HDFS-backed 
checkpointing and automatic operator recovery, preserving application state and 
resuming execution in seconds.  Functional and operational specifications are 
separated.  Apex provides a simple API, which enables users to write generic, 
reusable code.  The code is dropped in as-is and platform automatically handles 
the various operational concerns, such as state management
 , fault tolerance, scalability, security, metrics, etc.  This frees users to 
focus on functional development, and lets platform provide operability support. 
 The core Apex platform is supplemented by Malhar, a library of connector and 
logic functions, enabling rapid application development.  These operators and 
modules provide access to HDFS, S3, NFS, FTP, and other file systems; Kafka, 
ActiveMQ, RabbitMQ, JMS, and other message systems; MySql, Cassandra, MongoDB, 
Redis, HBase, CouchDB, generic JDBC, and other database connectors.  In 
addition to the operators, the library contains a number of demos applications, 
demonstrating operator features and capabilities.  To see the full list of 
available operators and related documentation, visit  Apex Malhar on Github  
For additional information visit  Apache Apex (incubating) .", 
-            "title": "Apache Apex (Incubating)"
-        }, 
-        {
-            "location": "/apex_development_setup/", 
-            "text": "Apache Apex Development Environment Setup\n\n\nThis 
document discusses the steps needed for setting up a development environment 
for creating applications that run on the Apache Apex 
platform.\n\n\nDevelopment Tools\n\n\nThere are a few tools that will be 
helpful when developing Apache Apex applications, including:\n\n\n\n\n\n\ngit\n 
- A revision control system (version 1.7.1 or later). There are multiple git 
clients available for Windows (\nhttp://git-scm.com/download/win\n for 
example), so download and install a client of your choice.\n\n\n\n\n\n\njava 
JDK\n (not JRE) - Includes the Java Runtime Environment as well as the Java 
compiler and a variety of tools (version 1.7.0_79 or later). Can be downloaded 
from the Oracle website.\n\n\n\n\n\n\nmaven\n - Apache Maven is a build system 
for Java projects (version 3.0.5 or later). It can be downloaded from 
\nhttps://maven.apache.org/download.cgi\n.\n\n\n\n\n\n\nIDE\n (Optional) - If 
you prefer to use an IDE (Integra
 ted Development Environment) such as \nNetBeans\n, \nEclipse\n or 
\nIntelliJ\n, install that as well.\n\n\n\n\n\n\nAfter installing these tools, 
make sure that the directories containing the executable files are in your PATH 
environment variable.\n\n\n\n\nWindows\n - Open a console window and enter the 
command \necho %PATH%\n to see the value of the \nPATH\n variable and verify 
that the above directories for Java, git, and maven executables are present.  
JDK executables like \njava\n and \njavac\n, the directory might be something 
like \nC:\\Program Files\\Java\\jdk1.7.0\\_80\\bin\n; for \ngit\n it might be 
\nC:\\Program Files\\Git\\bin\n; and for maven it might be 
\nC:\\Users\\user\\Software\\apache-maven-3.3.3\\bin\n.  If not, you can change 
its value clicking on the button at \nControl Panel\n \n \nAdvanced System 
Settings\n \n \nAdvanced tab\n \n \nEnvironment Variables\n.\n\n\nLinux and 
Mac\n - Open a console/terminal window and enter the command \necho $PATH\n to 
see the value
  of the \nPATH\n variable and verify that the above directories for Java, git, 
and maven executables are present.  If not, make sure software is downloaded 
and installed, and optionally PATH reference is added and exported  in a 
\n~/.profile\n or \n~/.bash_profile\n.  For example to add maven located in 
\n/sfw/maven/apache-maven-3.3.3\n to PATH add the line: \nexport 
PATH=$PATH:/sfw/maven/apache-maven-3.3.3/bin\n\n\n\n\nConfirm by running the 
following commands and comparing with output that show in the table 
below:\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nCommand\n\n\nOutput\n\n\n\n\n\n\njavac 
-version\n\n\njavac 1.7.0_80\n\n\n\n\n\n\njava -version\n\n\njava version 
\n1.7.0_80\n\n\nJava(TM) SE Runtime Environment (build 1.7.0_80-b15)\n\n\nJava 
HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)\n\n\n\n\n\n\ngit 
--version\n\n\ngit version 2.6.1.windows.1\n\n\n\n\n\n\nmvn 
--version\n\n\nApache Maven 3.3.3 (7994120775791599e205a5524ec3e0dfe41d4a06; 
2015-04-22T06:57:37-05:00)\n\n\n...\n
 \n\n\n\n\n\n\n\n\n\n\nCreating New Apex Project\n\n\nAfter development tools 
are configured, you can now use the maven archetype to create a basic Apache 
Apex project.  \nNote:\n When executing the commands below, replace 
\n3.3.0-incubating\n by \nlatest available version\n of Apache 
Apex.\n\n\n\n\n\n\nWindows\n - Create a new Windows command file called 
\nnewapp.cmd\n by copying the lines below, and execute it.  When you run this 
file, the properties will be displayed and you will be prompted with \nY: :\n; 
just press \nEnter\n to complete the project generation.  The caret (^) at the 
end of some lines indicates that a continuation line follows. \n\n\n@echo 
off\n@rem Script for creating a new application\nsetlocal\nmvn 
archetype:generate ^\n -DarchetypeGroupId=org.apache.apex ^\n 
-DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.3.0-incubating 
^\n -DgroupId=com.example -Dpackage=com.example.myapexapp 
-DartifactId=myapexapp ^\n -Dversion=1.0-SNAPSHOT\nendlocal\n\n\n\n\n\
 n\n\nLinux\n - Execute the lines below in a terminal window.  New project will 
be created in the curent working directory.  The backslash (\\) at the end of 
the lines indicates continuation.\n\n\nmvn archetype:generate \\\n 
-DarchetypeGroupId=org.apache.apex \\\n 
-DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.2.0-incubating 
\\\n -DgroupId=com.example -Dpackage=com.example.myapexapp 
-DartifactId=myapexapp \\\n -Dversion=1.0-SNAPSHOT\n\n\n\n\n\n\n\nWhen the run 
completes successfully, you should see a new directory named \nmyapexapp\n 
containing a maven project for building a basic Apache Apex application. It 
includes 3 source files:\nApplication.java\n,  \nRandomNumberGenerator.java\n 
and \nApplicationTest.java\n. You can now build the application by stepping 
into the new directory and running the maven package command:\n\n\ncd 
myapexapp\nmvn clean package -DskipTests\n\n\n\nThe build should create the 
application package file \nmyapexapp/target/myapexapp-1.0-SNAPSHOT.
 apa\n. This application package can then be used to launch example application 
via \ndtCli\n, or other visual management tools.  When running, this 
application will generate a stream of random numbers and print them out, each 
prefixed by the string \nhello world:\n.\n\n\nRunning Unit Tests\n\n\nTo run 
unit tests on Linux or OSX, simply run the usual maven command, for example: 
\nmvn test\n.\n\n\nOn Windows, an additional file, \nwinutils.exe\n, is 
required; download it 
from\n\nhttps://github.com/srccodes/hadoop-common-2.2.0-bin/archive/master.zip\n\nand
 unpack the archive to, say, \nC:\\hadoop\n; this file should be present 
under\n\nhadoop-common-2.2.0-bin-master\\bin\n within it.\n\n\nSet the 
\nHADOOP_HOME\n environment variable system-wide 
to\n\nc:\\hadoop\\hadoop-common-2.2.0-bin-master\n as described 
at:\n\nhttps://www.microsoft.com/resources/documentation/windows/xp/all/proddocs/en-us/sysdm_advancd_environmnt_addchange_variable.mspx?mfr=true\n.
 You should now be able to run uni
 t tests normally.\n\n\nIf you prefer not to set the variable globally, you can 
set it on the command line or within\nyour IDE. For example, on the command 
line, specify the maven\nproperty \nhadoop.home.dir\n:\n\n\nmvn 
-Dhadoop.home.dir=c:\\hadoop\\hadoop-common-2.2.0-bin-master test\n\n\n\nor set 
the environment variable separately:\n\n\nset 
HADOOP_HOME=c:\\hadoop\\hadoop-common-2.2.0-bin-master\nmvn test\n\n\n\nWithin 
your IDE, set the environment variable and then run the desired\nunit test in 
the usual way. For example, with NetBeans you can 
add:\n\n\nEnv.HADOOP_HOME=c:/hadoop/hadoop-common-2.2.0-bin-master\n\n\n\nat 
\nProperties \n Actions \n Run project \n Set Properties\n.\n\n\nSimilarly, in 
Eclipse (Mars) add it to the\nproject properties at \nProperties \n Run/Debug 
Settings \n ApplicationTest\n\n Environment\n tab.\n\n\nBuilding Apex 
Demos\n\n\nIf you want to see more substantial Apex demo applications and the 
associated source code, you can follow these simple steps to ch
 eck out and build them.\n\n\n\n\n\n\nCheck out the source code 
repositories:\n\n\ngit clone https://github.com/apache/incubator-apex-core\ngit 
clone https://github.com/apache/incubator-apex-malhar\n\n\n\n\n\n\n\nSwitch to 
the appropriate release branch and build each repository:\n\n\ncd 
incubator-apex-core\nmvn clean install -DskipTests\n\ncd 
incubator-apex-malhar\nmvn clean install -DskipTests\n\n\n\n\n\n\n\nThe 
\ninstall\n argument to the \nmvn\n command installs resources from each 
project to your local maven repository (typically \n.m2/repository\n under your 
home directory), and \nnot\n to the system directories, so Administrator 
privileges are not required. The  \n-DskipTests\n argument skips running unit 
tests since they take a long time. If this is a first-time installation, it 
might take several minutes to complete because maven will download a number of 
associated plugins.\n\n\nAfter the build completes, you should see the demo 
application package files in the target direc
 tory under each demo subdirectory in 
\nincubator-apex-malhar/demos\n.\n\n\nSandbox\n\n\nTo jump start development 
with an Apache Hadoop single node cluster, \nDataTorrent Sandbox\n powered by 
VirtualBox is available on Windows, Linux, or Mac platforms.  The sandbox is 
configured by default to run with 6GB RAM; if your development machine has 16GB 
or more, you can increase the sandbox RAM to 8GB or more using the VirtualBox 
console.  This will yield better performance and support larger applications.  
The advantage of developing in the sandbox is that most of the tools (e.g. 
\njdk\n, \ngit\n, \nmaven\n), Hadoop YARN and HDFS, and a distribution of 
Apache Apex and DataTorrent RTS are pre-installed.  The disadvantage is that 
the sandbox is a memory-limited environment, and requires settings changes and 
restarts to adjust memory available for development and testing.", 
-            "title": "Development Setup"
-        }, 
-        {
-            "location": 
"/apex_development_setup/#apache-apex-development-environment-setup", 
-            "text": "This document discusses the steps needed for setting up a 
development environment for creating applications that run on the Apache Apex 
platform.", 
-            "title": "Apache Apex Development Environment Setup"
-        }, 
-        {
-            "location": "/apex_development_setup/#development-tools", 
-            "text": "There are a few tools that will be helpful when 
developing Apache Apex applications, including:    git  - A revision control 
system (version 1.7.1 or later). There are multiple git clients available for 
Windows ( http://git-scm.com/download/win  for example), so download and 
install a client of your choice.    java JDK  (not JRE) - Includes the Java 
Runtime Environment as well as the Java compiler and a variety of tools 
(version 1.7.0_79 or later). Can be downloaded from the Oracle website.    
maven  - Apache Maven is a build system for Java projects (version 3.0.5 or 
later). It can be downloaded from  https://maven.apache.org/download.cgi .    
IDE  (Optional) - If you prefer to use an IDE (Integrated Development 
Environment) such as  NetBeans ,  Eclipse  or  IntelliJ , install that as well. 
   After installing these tools, make sure that the directories containing the 
executable files are in your PATH environment variable.   Windows  - Open a 
console window and
  enter the command  echo %PATH%  to see the value of the  PATH  variable and 
verify that the above directories for Java, git, and maven executables are 
present.  JDK executables like  java  and  javac , the directory might be 
something like  C:\\Program Files\\Java\\jdk1.7.0\\_80\\bin ; for  git  it 
might be  C:\\Program Files\\Git\\bin ; and for maven it might be  
C:\\Users\\user\\Software\\apache-maven-3.3.3\\bin .  If not, you can change 
its value clicking on the button at  Control Panel     Advanced System Settings 
    Advanced tab     Environment Variables .  Linux and Mac  - Open a 
console/terminal window and enter the command  echo $PATH  to see the value of 
the  PATH  variable and verify that the above directories for Java, git, and 
maven executables are present.  If not, make sure software is downloaded and 
installed, and optionally PATH reference is added and exported  in a  
~/.profile  or  ~/.bash_profile .  For example to add maven located in  
/sfw/maven/apache-maven-3.3
 .3  to PATH add the line:  export PATH=$PATH:/sfw/maven/apache-maven-3.3.3/bin 
  Confirm by running the following commands and comparing with output that show 
in the table below:         Command  Output    javac -version  javac 1.7.0_80   
 java -version  java version  1.7.0_80  Java(TM) SE Runtime Environment (build 
1.7.0_80-b15)  Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)  
  git --version  git version 2.6.1.windows.1    mvn --version  Apache Maven 
3.3.3 (7994120775791599e205a5524ec3e0dfe41d4a06; 2015-04-22T06:57:37-05:00)  
...", 
-            "title": "Development Tools"
-        }, 
-        {
-            "location": "/apex_development_setup/#creating-new-apex-project", 
-            "text": "After development tools are configured, you can now use 
the maven archetype to create a basic Apache Apex project.   Note:  When 
executing the commands below, replace  3.3.0-incubating  by  latest available 
version  of Apache Apex.    Windows  - Create a new Windows command file called 
 newapp.cmd  by copying the lines below, and execute it.  When you run this 
file, the properties will be displayed and you will be prompted with  Y: : ; 
just press  Enter  to complete the project generation.  The caret (^) at the 
end of some lines indicates that a continuation line follows.   @echo off\n@rem 
Script for creating a new application\nsetlocal\nmvn archetype:generate ^\n 
-DarchetypeGroupId=org.apache.apex ^\n -DarchetypeArtifactId=apex-app-archetype 
-DarchetypeVersion=3.3.0-incubating ^\n -DgroupId=com.example 
-Dpackage=com.example.myapexapp -DartifactId=myapexapp ^\n 
-Dversion=1.0-SNAPSHOT\nendlocal    Linux  - Execute the lines below in a 
terminal window.  New projec
 t will be created in the curent working directory.  The backslash (\\) at the 
end of the lines indicates continuation.  mvn archetype:generate \\\n 
-DarchetypeGroupId=org.apache.apex \\\n 
-DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.2.0-incubating 
\\\n -DgroupId=com.example -Dpackage=com.example.myapexapp 
-DartifactId=myapexapp \\\n -Dversion=1.0-SNAPSHOT    When the run completes 
successfully, you should see a new directory named  myapexapp  containing a 
maven project for building a basic Apache Apex application. It includes 3 
source files: Application.java ,   RandomNumberGenerator.java  and  
ApplicationTest.java . You can now build the application by stepping into the 
new directory and running the maven package command:  cd myapexapp\nmvn clean 
package -DskipTests  The build should create the application package file  
myapexapp/target/myapexapp-1.0-SNAPSHOT.apa . This application package can then 
be used to launch example application via  dtCli , or other visual 
 management tools.  When running, this application will generate a stream of 
random numbers and print them out, each prefixed by the string  hello world: 
.", 
-            "title": "Creating New Apex Project"
-        }, 
-        {
-            "location": "/apex_development_setup/#running-unit-tests", 
-            "text": "To run unit tests on Linux or OSX, simply run the usual 
maven command, for example:  mvn test .  On Windows, an additional file,  
winutils.exe , is required; download it from 
https://github.com/srccodes/hadoop-common-2.2.0-bin/archive/master.zip \nand 
unpack the archive to, say,  C:\\hadoop ; this file should be present under 
hadoop-common-2.2.0-bin-master\\bin  within it.  Set the  HADOOP_HOME  
environment variable system-wide to c:\\hadoop\\hadoop-common-2.2.0-bin-master  
as described at: 
https://www.microsoft.com/resources/documentation/windows/xp/all/proddocs/en-us/sysdm_advancd_environmnt_addchange_variable.mspx?mfr=true
 . You should now be able to run unit tests normally.  If you prefer not to set 
the variable globally, you can set it on the command line or within\nyour IDE. 
For example, on the command line, specify the maven\nproperty  hadoop.home.dir 
:  mvn -Dhadoop.home.dir=c:\\hadoop\\hadoop-common-2.2.0-bin-master test  or 
set the environment variable
  separately:  set HADOOP_HOME=c:\\hadoop\\hadoop-common-2.2.0-bin-master\nmvn 
test  Within your IDE, set the environment variable and then run the 
desired\nunit test in the usual way. For example, with NetBeans you can add:  
Env.HADOOP_HOME=c:/hadoop/hadoop-common-2.2.0-bin-master  at  Properties   
Actions   Run project   Set Properties .  Similarly, in Eclipse (Mars) add it 
to the\nproject properties at  Properties   Run/Debug Settings   
ApplicationTest  Environment  tab.", 
-            "title": "Running Unit Tests"
-        }, 
-        {
-            "location": "/apex_development_setup/#building-apex-demos", 
-            "text": "If you want to see more substantial Apex demo 
applications and the associated source code, you can follow these simple steps 
to check out and build them.    Check out the source code repositories:  git 
clone https://github.com/apache/incubator-apex-core\ngit clone 
https://github.com/apache/incubator-apex-malhar    Switch to the appropriate 
release branch and build each repository:  cd incubator-apex-core\nmvn clean 
install -DskipTests\n\ncd incubator-apex-malhar\nmvn clean install -DskipTests  
  The  install  argument to the  mvn  command installs resources from each 
project to your local maven repository (typically  .m2/repository  under your 
home directory), and  not  to the system directories, so Administrator 
privileges are not required. The   -DskipTests  argument skips running unit 
tests since they take a long time. If this is a first-time installation, it 
might take several minutes to complete because maven will download a number of 
associated plugins.  A
 fter the build completes, you should see the demo application package files in 
the target directory under each demo subdirectory in  
incubator-apex-malhar/demos .", 
-            "title": "Building Apex Demos"
-        }, 
-        {
-            "location": "/apex_development_setup/#sandbox", 
-            "text": "To jump start development with an Apache Hadoop single 
node cluster,  DataTorrent Sandbox  powered by VirtualBox is available on 
Windows, Linux, or Mac platforms.  The sandbox is configured by default to run 
with 6GB RAM; if your development machine has 16GB or more, you can increase 
the sandbox RAM to 8GB or more using the VirtualBox console.  This will yield 
better performance and support larger applications.  The advantage of 
developing in the sandbox is that most of the tools (e.g.  jdk ,  git ,  maven 
), Hadoop YARN and HDFS, and a distribution of Apache Apex and DataTorrent RTS 
are pre-installed.  The disadvantage is that the sandbox is a memory-limited 
environment, and requires settings changes and restarts to adjust memory 
available for development and testing.", 
-            "title": "Sandbox"
-        }, 
-        {
-            "location": "/application_development/", 
-            "text": "Application Developer Guide\n\n\nThe Apex platform is 
designed to process massive amounts of\nreal-time events natively in Hadoop.  
It runs as a YARN (Hadoop 2.x) \napplication and leverages Hadoop as a 
distributed operating\nsystem.  All the basic distributed operating system 
capabilities of\nHadoop like resource management (YARN), distributed file 
system (HDFS),\nmulti-tenancy, security, fault-tolerance, and scalability are 
supported natively \nin all the Apex applications. \u00a0The platform handles 
all the details of the application \nexecution, including dynamic scaling, 
state checkpointing and recovery, event \nprocessing guarantees, etc. allowing 
you to focus on writing your application logic without\nmixing operational and 
functional concerns.\n\n\nIn the platform, building a streaming application can 
be extremely\neasy and intuitive. \u00a0The application is represented as a 
Directed\nAcyclic Graph (DAG) of computation units called \nOperators\n interco
 nnected\nby the data-flow edges called  \nStreams\n.\u00a0The operators 
process input\nstreams and produce output streams. A library of common 
operators is\nprovided to enable quick application development. \u00a0In case 
the desired\nprocessing is not available in the Operator Library, one can 
easily\nwrite a custom operator. We refer those interested in creating their 
own\noperators to the \nOperator Development Guide\n.\n\n\nRunning A Test 
Application\n\n\nIf you are starting with the Apex platform for the first 
time,\nit can be informative to launch an existing application and see it 
run.\nOne of the simplest examples provided in \nApex-Malhar repository\n is a 
Pi demo application,\nwhich computes the value of PI using random numbers.  
After \nsetting up development environment\n\nPi demo can be launched as 
follows:\n\n\n\n\nOpen up Apex Malhar files in your IDE (for example Eclipse, 
IntelliJ, NetBeans, etc)\n\n\nNavigate to 
\ndemos/pi/src/test/java/com/datatorrent/demos/Applicat
 ionTest.java\n\n\nRun the test for ApplicationTest.java\n\n\nView the output 
in system console\n\n\n\n\nCongratulations, you just ran your first real-time 
streaming demo :) \nThis demo is very simple and has four operators. The first 
operator\nemits random integers between 0 to 30, 000. The second operator 
receives\nthese coefficients and emits a hashmap with x and y values each time 
it\nreceives two values. The third operator takes these values and 
computes\nx**2+y**2. The last operator counts how many computed values 
from\nthe previous operator were less than or equal to 30, 000**2. 
Assuming\nthis count is N, then PI is computed as N/number of values 
received.\nHere is the code snippet for the PI application. This code populates 
the\nDAG. Do not worry about what each line does, we will cover these\nconcepts 
later in this document.\n\n\n// Generates random numbers\nRandomEventGenerator 
rand = dag.addOperator(\nrand\n, new 
RandomEventGenerator());\nrand.setMinvalue(0);\nrand.setMaxv
 alue(30000);\n\n// Generates a round robin HashMap of \nx\n and 
\ny\n\nRoundRobinHashMap\nString,Object\n rrhm = dag.addOperator(\nrrhm\n, new 
RoundRobinHashMap\nString, Object\n());\nrrhm.setKeys(new String[] { \nx\n, 
\ny\n });\n\n// Calculates pi from x and y\nJavaScriptOperator calc = 
dag.addOperator(\npicalc\n, new 
Script());\ncalc.setPassThru(false);\ncalc.put(\ni\n,0);\ncalc.put(\ncount\n,0);\ncalc.addSetupScript(\nfunction
 pi() { if (x*x+y*y \n= \n+maxValue*maxValue+\n) { i++; } count++; return i / 
count * 4; }\n);\ncalc.setInvoke(\npi\n);\ndag.addStream(\nrand_rrhm\n, 
rand.integer_data, rrhm.data);\ndag.addStream(\nrrhm_calc\n, rrhm.map, 
calc.inBindings);\n\n// puts results on system console\nConsoleOutputOperator 
console = dag.addOperator(\nconsole\n, new 
ConsoleOutputOperator());\ndag.addStream(\nrand_console\n,calc.result, 
console.input);\n\n\n\n\nYou can review the other demos and see what they do. 
The examples\ngiven in the Demos project cover various features of the pl
 atform and we\nstrongly encourage you to read these to familiarize yourself 
with the\nplatform. In the remaining part of this document we will go 
through\ndetails needed for you to develop and run streaming applications 
in\nMalhar.\n\n\nTest Application: Yahoo! Finance Quotes\n\n\nThe 
PI\u00a0application was to\nget you started. It is a basic application and does 
not fully illustrate\nthe features of the platform. For the purpose of 
describing concepts, we\nwill consider the test application shown in Figure 1. 
The application\ndownloads tick data from  \nYahoo! Finance\n \u00a0and 
computes the\nfollowing for four tickers, namely \nIBM\n,\n\nGOOG\n, 
\nYHOO\n.\n\n\n\n\nQuote: Consisting of last trade price, last trade time, 
and\n    total volume for the day\n\n\nPer-minute chart data: Highest trade 
price, lowest trade\n    price, and volume during that minute\n\n\nSimple 
Moving Average: trade price over 5 minutes\n\n\n\n\nTotal volume must ensure 
that all trade volume for that day is\
 nadded, i.e. data loss would result in wrong results. Charting data needs\nall 
the trades in the same minute to go to the same slot, and then on it\nstarts 
afresh, so again data loss would result in wrong results. The\naggregation for 
charting data is done over 1 minute. Simple moving\naverage computes the 
average price over a 5 minute sliding window; it\ntoo would produce wrong 
results if there is data loss. Figure 1 shows\nthe application with no 
partitioning.\n\n\n\n\nThe operator 
StockTickerInput:\u00a0StockTickerInput\n\u00a0\nis\nthe input operator that 
reads live data from Yahoo! Finance once per\ninterval (user configurable in 
milliseconds), and emits the price, the\nincremental volume, and the last trade 
time of each stock symbol, thus\nemulating real ticks from the exchange. 
\u00a0We utilize the Yahoo! Finance\nCSV web service interface. \u00a0For 
example:\n\n\n$ GET 
'http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO\nf=sl1vt1'\n\nIBM\n,203.966,1513041,\n
 
1:43pm\n\n\nGOOG\n,762.68,1879741,\n1:43pm\n\n\nAAPL\n,444.3385,11738366,\n1:43pm\n\n\nYHOO\n,19.3681,14707163,\n1:43pm\n\n\n\n\n\nAmong
 all the operators in Figure 1, StockTickerInput is the only\noperator that 
requires extra code because it contains a custom mechanism\nto get the input 
data. \u00a0Other operators are used unchanged from the\nMalhar 
library.\n\n\nHere is the class implementation for StockTickInput:\n\n\npackage 
com.datatorrent.demos.yahoofinance;\n\nimport 
au.com.bytecode.opencsv.CSVReader;\nimport 
com.datatorrent.annotation.OutputPortFieldAnnotation;\nimport 
com.datatorrent.api.Context.OperatorContext;\nimport 
com.datatorrent.api.DefaultOutputPort;\nimport 
com.datatorrent.api.InputOperator;\nimport 
com.datatorrent.lib.util.KeyValPair;\nimport java.io.IOException;\nimport 
java.io.InputStream;\nimport java.io.InputStreamReader;\nimport 
java.util.*;\nimport org.apache.commons.httpclient.HttpClient;\nimport 
org.apache.commons.httpclient.HttpStatus;\nimport org.apache.
 commons.httpclient.cookie.CookiePolicy;\nimport 
org.apache.commons.httpclient.methods.GetMethod;\nimport 
org.apache.commons.httpclient.params.DefaultHttpParams;\nimport 
org.slf4j.Logger;\nimport org.slf4j.LoggerFactory;\n\n/**\n * This operator 
sends price, volume and time into separate ports and calculates incremental 
volume.\n */\npublic class StockTickInput implements InputOperator\n{\n  
private static final Logger logger = 
LoggerFactory.getLogger(StockTickInput.class);\n  /**\n   * Timeout interval 
for reading from server. 0 or negative indicates no timeout.\n   */\n  public 
int readIntervalMillis = 500;\n  /**\n   * The URL of the web service resource 
for the POST request.\n   */\n  private String url;\n  public String[] 
symbols;\n  private transient HttpClient client;\n  private transient GetMethod 
method;\n  private HashMap\nString, Long\n lastVolume = new HashMap\nString, 
Long\n();\n  private boolean outputEvenIfZeroVolume = false;\n  /**\n   * The 
output port to emit price.
 \n   */\n  @OutputPortFieldAnnotation(optional = true)\n  public final 
transient DefaultOutputPort\nKeyValPair\nString, Double\n price = new 
DefaultOutputPort\nKeyValPair\nString, Double\n();\n  /**\n   * The output port 
to emit incremental volume.\n   */\n  @OutputPortFieldAnnotation(optional = 
true)\n  public final transient DefaultOutputPort\nKeyValPair\nString, Long\n 
volume = new DefaultOutputPort\nKeyValPair\nString, Long\n();\n  /**\n   * The 
output port to emit last traded time.\n   */\n  
@OutputPortFieldAnnotation(optional = true)\n  public final transient 
DefaultOutputPort\nKeyValPair\nString, String\n time = new 
DefaultOutputPort\nKeyValPair\nString, String\n();\n\n  /**\n   * Prepare URL 
from symbols and parameters. URL will be something like: 
http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO\nf=sl1vt1\n 
  *\n   * @return the URL\n   */\n  private String prepareURL()\n  {\n    
String str = \nhttp://download.finance.yahoo.com/d/quotes.csv?s=\n;\n    for 
 (int i = 0; i \n symbols.length; i++) {\n      if (i != 0) {\n        str += 
\n,\n;\n      }\n      str += symbols[i];\n    }\n    str += 
\nf=sl1vt1\ne=.csv\n;\n    return str;\n  }\n\n  @Override\n  public void 
setup(OperatorContext context)\n  {\n    url = prepareURL();\n    client = new 
HttpClient();\n    method = new GetMethod(url);\n    
DefaultHttpParams.getDefaultParams().setParameter(\nhttp.protocol.cookie-policy\n,
 CookiePolicy.BROWSER_COMPATIBILITY);\n  }\n\n  @Override\n  public void 
teardown()\n  {\n  }\n\n  @Override\n  public void emitTuples()\n  {\n\n    try 
{\n      int statusCode = client.executeMethod(method);\n      if (statusCode 
!= HttpStatus.SC_OK) {\n        System.err.println(\nMethod failed: \n + 
method.getStatusLine());\n      } else {\n        InputStream istream = 
method.getResponseBodyAsStream();\n        // Process response\n        
InputStreamReader isr = new InputStreamReader(istream);\n        CSVReader 
reader = new CSVReader(isr);\n        List\nStri
 ng[]\n myEntries = reader.readAll();\n        for (String[] stringArr: 
myEntries) {\n          ArrayList\nString\n tuple = new 
ArrayList\nString\n(Arrays.asList(stringArr));\n          if (tuple.size() != 
4) {\n            return;\n          }\n          // input csv is 
\nSymbol\n,\nPrice\n,\nVolume\n,\nTime\n\n          String symbol = 
tuple.get(0);\n          double currentPrice = Double.valueOf(tuple.get(1));\n  
        long currentVolume = Long.valueOf(tuple.get(2));\n          String 
timeStamp = tuple.get(3);\n          long vol = currentVolume;\n          // 
Sends total volume in first tick, and incremental volume afterwards.\n          
if (lastVolume.containsKey(symbol)) {\n            vol -= 
lastVolume.get(symbol);\n          }\n\n          if (vol \n 0 || 
outputEvenIfZeroVolume) {\n            price.emit(new KeyValPair\nString, 
Double\n(symbol, currentPrice));\n            volume.emit(new 
KeyValPair\nString, Long\n(symbol, vol));\n            time.emit(new 
KeyValPair\nStrin
 g, String\n(symbol, timeStamp));\n            lastVolume.put(symbol, 
currentVolume);\n          }\n        }\n      }\n      
Thread.sleep(readIntervalMillis);\n    } catch (InterruptedException ex) {\n    
  logger.debug(ex.toString());\n    } catch (IOException ex) {\n      
logger.debug(ex.toString());\n    }\n  }\n\n  @Override\n  public void 
beginWindow(long windowId)\n  {\n  }\n\n  @Override\n  public void 
endWindow()\n  {\n  }\n\n  public void setOutputEvenIfZeroVolume(boolean 
outputEvenIfZeroVolume)\n  {\n    this.outputEvenIfZeroVolume = 
outputEvenIfZeroVolume;\n  }\n\n}\n\n\n\n\nThe operator has three output ports 
that emit the price of the\nstock, the volume of the stock and the last trade 
time of the stock,\ndeclared as public member variables price, volume\u00a0and  
time\u00a0of the class. \u00a0The tuple of the\nprice\u00a0output port is a 
key-value\npair with the stock symbol being the key, and the price being the 
value.\n\u00a0The tuple of the volume\u00a0output\nport i
 s a key value pair with the stock symbol being the key, and the\nincremental 
volume being the value. \u00a0The tuple of the  time\u00a0output port is a key 
value pair with the\nstock symbol being the key, and the last trade time being 
the\nvalue.\n\n\nImportant: Since operators will be\nserialized, all input and 
output ports need to be declared transient\nbecause they are stateless and 
should not be serialized.\n\n\nThe method\u00a0setup(OperatorContext)\ncontains 
the code that is necessary for setting up the HTTP\nclient for querying Yahoo! 
Finance.\n\n\nMethod\u00a0emitTuples() contains\nthe code that reads from 
Yahoo! Finance, and emits the data to the\noutput ports of the operator. 
\u00a0emitTuples()\u00a0will be called one or more times\nwithin one 
application window as long as time is allowed within the\nwindow.\n\n\nNote 
that we want to emulate the tick input stream by having\nincremental volume 
data with Yahoo! Finance data. \u00a0We therefore subtract\nthe previous volume 
f
 rom the current volume to emulate incremental\nvolume for each tick.\n\n\nThe 
operator\nDailyVolume:\u00a0This operator\nreads from the input port, which 
contains the incremental volume tuples\nfrom StockTickInput, and\naggregates 
the data to provide the cumulative volume. \u00a0It uses the\nlibrary class  
SumKeyVal\nK,V\n\u00a0provided in math\u00a0package. \u00a0In this 
case,\nSumKeyVal\nString,Long\n, where K is the stock symbol, V is 
the\naggregated volume, with cumulative\nset to true. (Otherwise if  
cumulativewas set to false, SumKeyVal would\nprovide the sum for the 
application window.) \u00a0Malhar provides a number\nof built-in operators for 
simple operations like this so that\napplication developers do not have to 
write them. \u00a0More examples to\nfollow. This operator assumes that the 
application restarts before the\nmarket opens every day.\n\n\nThe operator 
Quote:\nThis operator has three input ports, which are price 
(from\nStockTickInput), daily_vol (from\nDaily Volum
 e), and time (from\n StockTickInput). \u00a0This operator\njust consolidates 
the three data items and and emits the consolidated\ndata. \u00a0It utilizes 
the class ConsolidatorKeyVal\nK\n\u00a0from the\nstream\u00a0package.\n\n\nThe 
operator HighLow:\u00a0This operator reads from the input port,\nwhich contains 
the price tuples from StockTickInput, and provides the high and the\nlow price 
within the application window. \u00a0It utilizes the library class\n 
RangeKeyVal\nK,V\n\u00a0provided\nin the math\u00a0package. In this 
case,\nRangeKeyVal\nString,Double\n.\n\n\nThe operator MinuteVolume:\nThis 
operator reads from the input port, which contains the\nvolume tuples from 
StockTickInput,\nand aggregates the data to provide the sum of the volume 
within one\nminute. \u00a0Like the operator  DailyVolume, this operator also 
uses\nSumKeyVal\nString,Long\n, but\nwith cumulative set to false. 
\u00a0The\nApplication Window is set to one minute. We will explain how to set 
this\nlater.\n\n\nThe
  operator Chart:\nThis operator is very similar to the operator Quote, except 
that it takes inputs from\nHigh Low\u00a0and  Minute Vol\u00a0and outputs the 
consolidated tuples\nto the output port.\n\n\nThe operator PriceSMA:\nSMA 
stands for - Simple Moving Average. It reads from the\ninput port, which 
contains the price tuples from StockTickInput, and\nprovides the moving average 
price of the stock. \u00a0It utilizes\nSimpleMovingAverage\nString,Double\n, 
which is provided in the\n multiwindow\u00a0package.\nSimpleMovingAverage keeps 
track of the data of the previous N\napplication windows in a sliding manner. 
\u00a0For each end window event, it\nprovides the average of the data in those 
application windows.\n\n\nThe operator Console:\nThis operator just outputs the 
input tuples to the console\n(or stdout). \u00a0In this example, there are four 
console\u00a0operators, which connect to the output\nof  Quote, Chart, PriceSMA 
and VolumeSMA. \u00a0In\npractice, they should be replaced b
 y operators that use the data to\nproduce visualization artifacts like 
charts.\n\n\nConnecting the operators together and constructing 
the\nDAG:\u00a0Now that we know the\noperators used, we will create the DAG, 
set the streaming window size,\ninstantiate the operators, and connect the 
operators together by adding\nstreams that connect the output ports with the 
input ports among those\noperators. \u00a0This code is in the file  
YahooFinanceApplication.java. Refer to Figure 1\nagain for the graphical 
representation of the DAG. \u00a0The last method in\nthe code, namely 
getApplication(),\ndoes all that. \u00a0The rest of the methods are just for 
setting up the\noperators.\n\n\npackage 
com.datatorrent.demos.yahoofinance;\n\nimport 
com.datatorrent.api.ApplicationFactory;\nimport 
com.datatorrent.api.Context.OperatorContext;\nimport 
com.datatorrent.api.DAG;\nimport 
com.datatorrent.api.Operator.InputPort;\nimport 
com.datatorrent.lib.io.ConsoleOutputOperator;\nimport com.datatorrent.lib.mat
 h.RangeKeyVal;\nimport com.datatorrent.lib.math.SumKeyVal;\nimport 
com.datatorrent.lib.multiwindow.SimpleMovingAverage;\nimport 
com.datatorrent.lib.stream.ConsolidatorKeyVal;\nimport 
com.datatorrent.lib.util.HighLow;\nimport 
org.apache.hadoop.conf.Configuration;\n\n/**\n * Yahoo! Finance application 
demo. \np\n\n *\n * Get Yahoo finance feed and calculate minute price range, 
minute volume, simple moving average of 5 minutes.\n */\npublic class 
Application implements StreamingApplication\n{\n  private int 
streamingWindowSizeMilliSeconds = 1000; // 1 second (default is 500ms)\n  
private int appWindowCountMinute = 60;   // 1 minute\n  private int 
appWindowCountSMA = 5 * 60;  // 5 minute\n\n  /**\n   * Get actual Yahoo 
finance ticks of symbol, last price, total daily volume, and last traded 
price.\n   */\n  public StockTickInput getStockTickInputOperator(String name, 
DAG dag)\n  {\n    StockTickInput oper = dag.addOperator(name, 
StockTickInput.class);\n    oper.readIntervalMillis = 200;
 \n    return oper;\n  }\n\n  /**\n   * This sends total daily volume by adding 
volumes from each ticks.\n   */\n  public SumKeyVal\nString, Long\n 
getDailyVolumeOperator(String name, DAG dag)\n  {\n    SumKeyVal\nString, 
Long\n oper = dag.addOperator(name, new SumKeyVal\nString, Long\n());\n    
oper.setType(Long.class);\n    oper.setCumulative(true);\n    return oper;\n  
}\n\n  /**\n   * Get aggregated volume of 1 minute and send at the end window 
of 1 minute.\n   */\n  public SumKeyVal\nString, Long\n 
getMinuteVolumeOperator(String name, DAG dag, int appWindowCount)\n  {\n    
SumKeyVal\nString, Long\n oper = dag.addOperator(name, new SumKeyVal\nString, 
Long\n());\n    oper.setType(Long.class);\n    
oper.setEmitOnlyWhenChanged(true);\ndag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);\n
    return oper;\n  }\n\n  /**\n   * Get High-low range for 1 minute.\n   */\n  
public RangeKeyVal\nString, Double\n getHighLowOperator(String name
 , DAG dag, int appWindowCount)\n  {\n    RangeKeyVal\nString, Double\n oper = 
dag.addOperator(name, new RangeKeyVal\nString, Double\n());\n    
dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);\n
    oper.setType(Double.class);\n    return oper;\n  }\n\n  /**\n   * Quote 
(Merge price, daily volume, time)\n   */\n  public 
ConsolidatorKeyVal\nString,Double,Long,String,?,?\n getQuoteOperator(String 
name, DAG dag)\n  {\n    ConsolidatorKeyVal\nString,Double,Long,String,?,?\n 
oper = dag.addOperator(name, new 
ConsolidatorKeyVal\nString,Double,Long,String,Object,Object\n());\n    return 
oper;\n  }\n\n  /**\n   * Chart (Merge minute volume and minute high-low)\n   
*/\n  public ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n 
getChartOperator(String name, DAG dag)\n  {\n    
ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n oper = dag.addOperator(name, 
new ConsolidatorKeyVal\nString,HighLow,Long,Object,Object,Object\n());\n    
return oper;\n  }
 \n\n  /**\n   * Get simple moving average of price.\n   */\n  public 
SimpleMovingAverage\nString, Double\n 
getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount)\n 
 {\n    SimpleMovingAverage\nString, Double\n oper = dag.addOperator(name, new 
SimpleMovingAverage\nString, Double\n());\n    
oper.setWindowSize(appWindowCount);\n    oper.setType(Double.class);\n    
return oper;\n  }\n\n  /**\n   * Get console for output.\n   */\n  public 
InputPort\nObject\n getConsole(String name, /*String nodeName,*/ DAG dag, 
String prefix)\n  {\n    ConsoleOutputOperator oper = dag.addOperator(name, 
ConsoleOutputOperator.class);\n    oper.setStringFormat(prefix + \n: %s\n);\n   
 return oper.input;\n  }\n\n  /**\n   * Create Yahoo Finance Application DAG.\n 
  */\n  @Override\n  public void populateDAG(DAG dag, Configuration conf)\n  
{\n    
dag.getAttributes().put(DAG.STRAM_WINDOW_SIZE_MILLIS,streamingWindowSizeMilliSeconds);\n\n
    StockTickInput tick = getStockTickInputOperator(
 \nStockTickInput\n, dag);\n    SumKeyVal\nString, Long\n dailyVolume = 
getDailyVolumeOperator(\nDailyVolume\n, dag);\n    
ConsolidatorKeyVal\nString,Double,Long,String,?,?\n quoteOperator = 
getQuoteOperator(\nQuote\n, dag);\n\n    RangeKeyVal\nString, Double\n highlow 
= getHighLowOperator(\nHighLow\n, dag, appWindowCountMinute);\n    
SumKeyVal\nString, Long\n minuteVolume = 
getMinuteVolumeOperator(\nMinuteVolume\n, dag, appWindowCountMinute);\n    
ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n chartOperator = 
getChartOperator(\nChart\n, dag);\n\n    SimpleMovingAverage\nString, Double\n 
priceSMA = getPriceSimpleMovingAverageOperator(\nPriceSMA\n, dag, 
appWindowCountSMA);\n       DefaultPartitionCodec\nString, Double\n codec = new 
DefaultPartitionCodec\nString, Double\n();\n    
dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec);\n    
dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec);\n    
dag.addStream(\nprice\n, tick.price, quoteOperato
 r.in1, highlow.data, priceSMA.data);\n    dag.addStream(\nvol\n, tick.volume, 
dailyVolume.data, minuteVolume.data);\n    dag.addStream(\ntime\n, tick.time, 
quoteOperator.in3);\n    dag.addStream(\ndaily_vol\n, dailyVolume.sum, 
quoteOperator.in2);\n\n    dag.addStream(\nquote_data\n, quoteOperator.out, 
getConsole(\nquoteConsole\n, dag, \nQUOTE\n));\n\n    
dag.addStream(\nhigh_low\n, highlow.range, chartOperator.in1);\n    
dag.addStream(\nvol_1min\n, minuteVolume.sum, chartOperator.in2);\n    
dag.addStream(\nchart_data\n, chartOperator.out, getConsole(\nchartConsole\n, 
dag, \nCHART\n));\n\n    dag.addStream(\nsma_price\n, priceSMA.doubleSMA, 
getConsole(\npriceSMAConsole\n, dag, \nPrice SMA\n));\n\n    return dag;\n  
}\n\n}\n\n\n\n\nNote that we also set a user-specific sliding window for SMA 
that\nkeeps track of the previous N data points. \u00a0Do not confuse this with 
the\nattribute APPLICATION_WINDOW_COUNT.\n\n\nIn the rest of this chapter we 
will run through the process of\nrunnin
 g this application. We assume that \u00a0you are familiar with details\nof 
your Hadoop infrastructure. For installation\ndetails please refer to the 
\nInstallation Guide\n.\n\n\nRunning a Test Application\n\n\nWe will now 
describe how to run the yahoo\nfinance application\u00a0described above in 
different modes\n(local mode, single node on Hadoop, and multi-nodes on 
Hadoop).\n\n\nThe platform runs streaming applications under the control of 
a\nlight-weight Streaming Application Manager (STRAM). Each application 
has\nits own instance of STRAM. STRAM launches the application and\ncontinually 
provides run time monitoring, analysis, and takes action\nsuch as load scaling 
or outage recovery as needed. \u00a0We will discuss\nSTRAM in more detail in 
the next chapter.\n\n\nThe instructions below assume that the platform was 
installed in a\ndirectory \nINSTALL_DIR\n and the command line interface (CLI) 
will\nbe used to launch the demo application. An application can be run 
in\nlocal mode\u00
 a0(in IDE or from command line) or on a Hadoop cluster.\n\n\nTo start the 
dtCli run\n\n\nINSTALL_DIR\n/bin/dtcli\n\n\n\nThe command line prompt appears.  
To start the application in local mode (the actual version number in the file 
name may differ)\n\n\ndt\n launch -local 
\nINSTALL_DIR\n/yahoo-finance-demo-3.2.0-SNAPSHOT.apa\n\n\n\nTo terminate the 
application in local mode, enter Ctrl-C\n\n\nTu run the application on the 
Hadoop cluster (the actual version\nnumber in the file name may 
differ)\n\n\ndt\n launch 
\nINSTALL_DIR\n/yahoo-finance-demo-3.2.0-SNAPSHOT.apa\n\n\n\nTo stop the 
application running in Hadoop, terminate it in the dtCli:\n\n\ndt\n 
kill-app\n\n\n\nExecuting the application in either mode includes the 
following\nsteps. At a top level, STRAM (Streaming Application Manager) 
validates\nthe application (DAG), translates the logical plan to the physical 
plan\nand then launches the execution engine. The mode determines 
the\nresources needed and how how they are used.\n\n\nL
 ocal Mode\n\n\nIn local mode, the application is run as a 
single-process\u00a0with multiple threads. Although a\nfew Hadoop classes are 
needed, there is no dependency on a Hadoop\ncluster or Hadoop services. The 
local file system is used in place of\nHDFS. This mode allows a quick run of an 
application in a single process\nsandbox, and hence is the most suitable to 
debug and analyze the\napplication logic. This mode is recommended for 
developing the\napplication and can be used for running applications within the 
IDE for\nfunctional testing purposes. Due to limited resources and lack 
\u00a0of\nscalability an application running in this single process mode is 
more\nlikely to encounter throughput bottlenecks. A distributed cluster 
is\nrecommended for benchmarking and production testing.\n\n\nHadoop 
Cluster\n\n\nIn this section we discuss various Hadoop cluster 
setups.\n\n\nSingle Node Cluster\n\n\nIn a single node Hadoop cluster all 
services are deployed on a\nsingle server (a develop
 er can use his/her development machine as a\nsingle node cluster). The 
platform does not distinguish between a single\nor multi-node setup and behaves 
exactly the same in both cases.\n\n\nIn this mode, the resource manager, name 
node, data node, and node\nmanager occupy one process each. This is an example 
of running a\nstreaming application as a multi-process\u00a0application on the 
same server.\nWith prevalence of fast, multi-core systems, this mode is 
effective for\ndebugging, fine tuning, and generic analysis before submitting 
the job\nto a larger Hadoop cluster. In this mode, execution uses the 
Hadoop\nservices and hence is likely to identify issues that are related to 
the\nHadoop environment (such issues will not be uncovered in local mode).\nThe 
throughput will obviously not be as high as on a multi-node Hadoop\ncluster. 
Additionally, since each container (i.e. Java process) requires\na significant 
amount of memory, you will be able to run a much smaller\nnumber of containers
  than on a multi-node cluster.\n\n\nMulti-Node Cluster\n\n\nIn a multi-node 
Hadoop cluster all the services of Hadoop are\ntypically distributed across 
multiple nodes in a production or\nproduction-level test environment. Upon 
launch the application is\nsubmitted to the Hadoop cluster and executes as a  
multi-processapplication on\u00a0multiple nodes.\n\n\nBefore you start 
deploying, testing and troubleshooting your\napplication on a cluster, you 
should ensure that Hadoop (version 2.2.0\nor later)\u00a0is properly installed 
and\nyou have basic skills for working with it.\n\n\n\n\nApache Apex Platform 
Overview\n\n\nStreaming Computational Model\n\n\nIn this chapter, we describe 
the the basics of the real-time streaming platform and its computational 
model.\n\n\nThe platform is designed to enable completely asynchronous real 
time computations\u00a0done in as unblocked a way as possible with\nminimal 
overhead .\n\n\nApplications running in the platform are represented by a 
Directed\nAc
 yclic Graph (DAG) made up of \u00a0operators and streams. All 
computations\nare done in memory on arrival of\nthe input data, with an option 
to save the output to disk (HDFS) in a\nnon-blocking way. The data that flows 
between operators consists of\natomic data elements. Each data element along 
with its type definition\n(henceforth called  schema) is\ncalled a 
tuple.\u00a0An application is a\ndesign of the flow of these tuples to and 
from\nthe appropriate compute units to enable the computation of the 
final\ndesired results.\u00a0A message queue (henceforth called\n\u00a0buffer 
server) manages tuples streaming\nbetween compute units in different 
processes.This server keeps track of\nall consumers, publishers, partitions, 
and enables replay. More\ninformation is given in later section.\n\n\nThe 
streaming application is monitored by a decision making entity\ncalled STRAM 
(streaming application\nmanager).\u00a0STRAM is designed to be a light 
weight\ncontroller that has minimal but suff
 icient interaction with the\napplication. This is done via periodic 
heartbeats. The\nSTRAM does the initial launch and periodically analyzes the 
system\nmetrics to decide if any run time action needs to be taken.\n\n\nA 
fundamental building block for the streaming platform\nis the concept of 
breaking up a stream into equal finite time slices\ncalled streaming windows. 
Each window contains the ordered\nset of tuples in that time slice. A typical 
duration of a window is 500\nms, but can be configured per application (the 
Yahoo! Finance\napplication configures this value in the  
properties.xml\u00a0file to be 1000ms = 1s). Each\nwindow is preceded by a 
begin_window\u00a0event and is terminated by an\nend_window\u00a0event, and is 
assigned\na unique window ID. Even though the platform performs computations 
at\nthe tuple level, bookkeeping is done at the window boundary, making 
the\ncomputations within a window an atomic event in the platform. \u00a0We 
can\nthink of each window as an  at
 omic\nmicro-batch\u00a0of tuples, to be processed together as one\natomic 
operation (See Figure 2). \u00a0\n\n\nThis atomic batching allows the platform 
to avoid the very steep\nper tuple bookkeeping cost and instead has a 
manageable per batch\nbookkeeping cost. This translates to higher throughput, 
low recovery\ntime, and higher scalability. Later in this document we 
illustrate how\nthe atomic micro-batch concept allows more efficient 
optimization\nalgorithms.\n\n\nThe platform also has in-built support 
for\napplication windows.\u00a0 An application window is part of 
the\napplication specification, and can be a small or large multiple of 
the\nstreaming window. \u00a0An example from our Yahoo! Finance test 
application\nis the moving average, calculated over a sliding application 
window of 5\nminutes which equates to 300 (= 5 * 60) streaming 
windows.\n\n\nNote that these two window concepts are distinct. \u00a0A 
streaming\nwindow is an abstraction of many tuples into a higher atomic 
 event for\neasier management. \u00a0An application window is a group of 
consecutive\nstreaming windows used for data aggregation (e.g. sum, average, 
maximum,\nminimum) on a per operator level.\n\n\n\n\nAlongside the 
platform,\u00a0a set of\npredefined, benchmarked standard library operator 
templates is provided\nfor ease of use and rapid development of 
application.\u00a0These\noperators are open sourced to Apache Software 
Foundation under the\nproject name \u201cMalhar\u201d as part of our efforts to 
foster community\ninnovation. These operators can be used in a DAG as is, while 
others\nhave properties\u00a0that can be set to specify the\ndesired 
computation. Those interested in details, should refer to\n\nApex-Malhar 
operator library\n.\n\n\nThe platform is a Hadoop YARN native\napplication. It 
runs in a Hadoop cluster just like any\nother YARN application (MapReduce etc.) 
and is designed to seamlessly\nintegrate with rest of Hadoop technology stack. 
It leverages Hadoop as\nmuch as
  possible and relies on it as its distributed operating system.\nHadoop 
dependencies include resource management, compute/memory/network\nallocation, 
HDFS, security, fault tolerance, monitoring, metrics,\nmulti-tenancy, logging 
etc. Hadoop classes/concepts are reused as much\nas possible.  The aim is to 
enable enterprises\nto leverage their existing Hadoop infrastructure for real 
time streaming\napplications. The platform is designed to scale with big\ndata 
applications and scale with Hadoop.\n\n\nA streaming application is an 
asynchronous execution of\ncomputations across distributed nodes. All 
computations are done in\nparallel on a distributed cluster. The computation 
model is designed to\ndo as many parallel computations as possible in a 
non-blocking fashion.\nThe task of monitoring of the entire application is done 
on (streaming)\nwindow boundaries with a streaming window as an atomic entity. 
A window\ncompletion is a quantum of work done. There is no assumption that 
an\noperat
 or can be interrupted at precisely a particular tuple or window.\n\n\nAn 
operator itself also\ncannot assume or predict the exact time a tuple that it 
emitted would\nget consumed by downstream operators. The operator processes the 
tuples\nit gets and simply emits new tuples based on its business logic. 
The\nonly guarantee it has is that the upstream operators are 
processing\neither the current or some later window, and the downstream 
operator is\nprocessing either the current or some earlier window. The 
completion of\na window (i.e. propagation of the  end_window\u00a0event through 
an operator) in any\noperator guarantees that all upstream operators have 
finished processing\nthis window. Thus, the end_window\u00a0event is blocking 
on an operator\nwith multiple outputs, and is a synchronization point in the 
DAG. The\n begin_window\u00a0event does not have\nany such restriction, a 
single begin_window\u00a0event from any upstream operator\ntriggers the 
operator to start processing tupl
 es.\n\n\nStreaming Application Manager (STRAM)\n\n\nStreaming Application 
Manager (STRAM) is the Hadoop YARN native\napplication master. STRAM is the 
first process that is activated upon\napplication launch and orchestrates the 
streaming application on the\nplatform. STRAM is a lightweight controller 
process. The\nresponsibilities of STRAM include\n\n\n\n\n\n\nRunning the 
Application\n\n\n\n\nRead the\u00a0logical plan\u00a0of the application (DAG) 
submitted by the client\n\n\nValidate the logical plan\n\n\nTranslate the 
logical plan into a physical plan, where certain operators may  be partitioned 
(i.e. replicated) to multiple operators for  handling load.\n\n\nRequest 
resources (Hadoop containers) from Resource Manager,\n    per physical 
plan\n\n\nBased on acquired resources and application attributes, create\n    
an execution plan\u00a0by partitioning the DAG into fragments,\n    each 
assigned to different containers.\n\n\nExecutes the application by deploying 
each fragment to\n 
    its container. Containers then start stream processing and run\n    
autonomously, processing one streaming window after another. Each\n    
container is represented as an instance of the  StreamingContainer\u00a0class, 
which updates\n    STRAM via the heartbeat protocol and processes directions 
received\n    from STRAM.\n\n\n\n\n\n\n\n\nContinually monitoring the 
application via heartbeats from each StreamingContainer\n\n\n\n\nCollecting 
Application System Statistics and Logs\n\n\nLogging all application-wide 
decisions taken\n\n\nProviding system data on the state of the application via 
a  Web Service.\n\n\n\n\nSupporting Fault Tolerance\n\n\na.  Detecting a node 
outage\nb.  Requesting a replacement resource from the Resource Manager\n    
and scheduling state restoration for the streaming operators\nc.  Saving state 
to Zookeeper\n\n\n\n\n\n\nSupporting Dynamic Partitioning:\u00a0Periodically 
evaluating the SLA and modifying the physical plan if required\n    (logical 
plan does not
  change).\n\n\n\n\nEnabling Security:\u00a0Distributing security tokens for 
distributed components of the execution engine\n    and securing web service 
requests.\n\n\nEnabling Dynamic modification of DAG: In the future, we intend 
to allow for user initiated\n    modification of the logical plan to allow for 
changes to the\n    processing logic and functionality.\n\n\n\n\nAn example of 
the Yahoo! Finance Quote application scheduled on a\ncluster of 5 Hadoop 
containers (processes) is shown in Figure 3.\n\n\n\n\nAn example for the 
translation from a logical plan to a physical\nplan and an execution plan for a 
subset of the application is shown in\nFigure 4.\n\n\n\n\nHadoop 
Components\n\n\nIn this section we cover some aspects of Hadoop that 
your\nstreaming application interacts with. This section is not meant 
to\neducate the reader on Hadoop, but just get the reader acquainted with\nthe 
terms. We strongly advise readers to learn Hadoop from other\nsources.\n\n\nA 
streaming application
  runs as a native Hadoop 2.2 application.\nHadoop 2.2 does not differentiate 
between a map-reduce job and other\napplications, and hence as far as Hadoop is 
concerned, the streaming\napplication is just another job. This means that your 
application\nleverages all the bells and whistles Hadoop provides and is 
fully\nsupported within Hadoop technology stack. The platform is 
responsible\nfor properly integrating itself with the relevant components of 
Hadoop\nthat exist today and those that may emerge in the future\n\n\nAll 
investments that leverage multi-tenancy (for example quotas\nand queues), 
security (for example kerberos), data flow integration (for\nexample copying 
data in-out of HDFS), monitoring, metrics collections,\netc. will require no 
changes when streaming applications run 
on\nHadoop.\n\n\nYARN\n\n\nYARN\nis\nthe core library of Hadoop 2.2 that is 
tasked with resource management\nand works as a distributed application 
framework. In this section we\nwill walk through Yarn's
  components. In Hadoop 2.2, the old jobTracker\nhas been replaced by a 
combination of ResourceManager (RM) and\nApplicationMaster (AM).\n\n\nResource 
Manager (RM)\n\n\nResourceManager\n(RM)\nmanages all the distributed resources. 
It allocates and arbitrates all\nthe slots and the resources (cpu, memory, 
network) of these slots. It\nworks with per-node NodeManagers (NMs) and 
per-application\nApplicationMasters (AMs). Currently memory usage is monitored 
by RM; in\nupcoming releases it will have CPU as well as network management. RM 
is\nshared by map-reduce and streaming applications. Running 
streaming\napplications requires no changes in the RM.\n\n\nApplication Master 
(AM)\n\n\nThe AM is the watchdog or monitoring process for your 
application\nand has the responsibility of negotiating resources with RM 
and\ninteracting with NodeManagers to get the allocated containers 
started.\nThe AM is the starting point of your application and is considered 
user\ncode (not system Hadoop code). The
  AM itself runs in one container. All\nresource management within the 
application are managed by the AM. This\nis a critical feature for Hadoop 2.2 
where tasks done by jobTracker in\nHadoop 1.0 have been distributed allowing 
Hadoop 2.2 to scale much\nbeyond Hadoop 1.0. STRAM is a native YARN 
ApplicationManager.\n\n\nNode Managers (NM)\n\n\nThere is one 
\nNodeManager\n(NM)\nper node in the cluster. All the containers (i.e. 
processes) on that\nnode are monitored by the NM. It takes instructions from RM 
and manages\nresources of that node as per RM instructions. NMs interactions 
are same\nfor map-reduce and for streaming applications. Running 
streaming\napplications requires no changes in the NM.\n\n\nRPC 
Protocol\n\n\nCommunication among RM, AM, and NM is done via the Hadoop 
RPC\nprotocol. Streaming applications use the same protocol to send 
their\ndata. No changes are needed in RPC support provided by Hadoop to 
enable\ncommunication done by components of your application.\n\n\nHDFS\n
 \n\nHadoop includes a highly fault tolerant, high throughput\ndistributed file 
system (\nHDFS\n).\nIt runs on commodity hardware, and your streaming 
application will, by\ndefault, use it. There is no difference between files 
created by a\nstreaming application and those created by 
map-reduce.\n\n\nDeveloping An Application\n\n\nIn this chapter we describe the 
methodology to develop an\napplication using the Realtime Streaming Platform. 
The platform was\ndesigned to make it easy to build and launch sophisticated 
streaming\napplications with the developer having to deal only with 
the\napplication/business logic. The platform deals with details of where 
to\nrun what operators on which servers and how to correctly route streams\nof 
data among them.\n\n\nDevelopment Process\n\n\nWhile the platform does not 
mandate a specific methodology or set\nof development tools, we have 
recommendations to maximize productivity\nfor the different phases of 
application development.\n\n\nDesign\n\n\n\n\
 nIdentify common, reusable operators. Use a library\n    if 
possible.\n\n\nIdentify scalability and performance requirements before\n    
designing the DAG.\n\n\nLeverage attributes that the platform supports for 
scalability\n    and performance.\n\n\nUse operators that are benchmarked and 
tested so that later\n    surprises are minimized. If you have glue code, 
create appropriate\n    unit tests for it.\n\n\nUse THREAD_LOCAL locality for 
high throughput streams. If all\n    the operators on that stream cannot fit in 
one container,\n    try\u00a0NODE_LOCAL\u00a0locality. Both THREAD_LOCAL and\n  
  NODE_LOCAL streams avoid the Network Interface Card (NIC)\n    completly. The 
former uses intra-process communication to also avoid\n    
serialization-deserialization overhead.\n\n\nThe overall throughput and 
latencies are are not necessarily\n    correlated to the number of operators in 
a simple way -- the\n    relationship is more nuanced. A lot depends on how 
much work\n    individual op
 erators are doing, how many are able to operate in\n    parallel, and how much 
data is flowing through the arcs of the DAG.\n    It is, at times, better to 
break a computation down into its\n    constituent simple parts and then stitch 
them together via streams\n    to better utilize the compute resources of the 
cluster. Decide on a\n    per application basis the fine line between 
complexity of each\n    operator vs too many streams. Doing multiple 
computations in one\n    operator does save network I/O, while operators that 
are too complex\n    are hard to maintain.\n\n\nDo not use operators that 
depend on the order of two streams\n    as far as possible. In such cases 
behavior is not idempotent.\n\n\nPersist key information to HDFS if possible; 
it may be useful\n    for debugging later.\n\n\nDecide on an appropriate fault 
tolerance mechanism. If some\n    data loss is acceptable, use the at-most-once 
mechanism as it has\n    fastest recovery.\n\n\n\n\nCreating New 
Project\n\n\nPle
 ase refer to the \nApex Application Packages\n\u00a0for\nthe basic steps for 
creating a new project.\n\n\nWriting the application code\n\n\nPreferably use 
an IDE (Eclipse, Netbeans etc.) that allows you to\nmanage dependencies and 
assists with the Java coding. Specific benefits\ninclude ease of managing 
operator library jar files, individual operator\nclasses, ports and properties. 
It will also highlight and assist to\nrectify issues such as type mismatches 
when adding streams while\ntyping.\n\n\nTesting\n\n\nWrite test cases with 
JUnit or similar test framework so that code\nis tested as it is written. For 
such testing, the DAG can run in local\nmode within the IDE. Doing this may 
involve writing mock input or output\noperators for the integration points with 
external systems. For example,\ninstead of reading from a live data stream, the 
application in test mode\ncan read from and write to files. This can be done 
with a single\napplication DAG by instrumenting a test mode using set
 tings in the\nconfiguration that is passed to the application 
factory\ninterface.\n\n\nGood test coverage will not only eliminate basic 
validation errors\nsuch as missing port connections or property constraint 
violations, but\nalso validate the correct processing of the data. The same 
tests can be\nre-run whenever the application or its dependencies change 
(operator\nlibraries, version of the platform etc.)\n\n\nRunning an 
application\n\n\nThe platform provides a commandline tool called dtcli\u00a0for 
managing applications (launching,\nkilling, viewing, etc.). This tool was 
already discussed above briefly\nin the section entitled Running the Test 
Application. It will introspect\nthe jar file specified with the launch command 
for applications (classes\nthat implement ApplicationFactory) or properties 
files that define\napplications. It will also deploy the dependency jar files 
from the\napplication package to the cluster.\n\n\nDtcli can run the 
application in local mode (i.e. outsid
 e a\ncluster). It is recommended to first run the application in local 
mode\nin the development environment before launching on the Hadoop 
cluster.\nThis way some of the external system integration and 
correct\nfunctionality of the application can be verified in an easier to 
debug\nenvironment before testing distributed mode.\n\n\nFor more details on 
CLI please refer to the \ndtCli Guide\n.\n\n\nApplication API\n\n\nThis section 
introduces the API to write a streaming application.\nThe work involves 
connecting operators via streams to form the logical\nDAG. The steps 
are\n\n\n\n\n\n\nInstantiate an application (DAG)\n\n\n\n\n\n\n(Optional) Set 
Attributes\n\n\n\n\nAssign application name\n\n\nSet any other attributes as 
per application requirements\n\n\n\n\n\n\n\n\nCreate/re-use and instantiate 
operators\n\n\n\n\nAssign operator name that is unique within the  
application\n\n\nDeclare schema upfront for each operator (and thereby its 
ports)\n\n\n(Optional) Set properties\u00a0 and at
 tributes on the dag as per specification\n\n\nConnect ports of operators via 
streams\n\n\nEach stream connects one output port of an operator to one or  
more input ports of other operators.\n\n\n(Optional) Set attributes on the 
streams\n\n\n\n\n\n\n\n\n\n\n\n\nTest the application.\n\n\n\n\n\n\nThere are 
two methods to create an application, namely Java, and\nProperties file. Java 
API is for applications being developed by humans,\nand properties file (Hadoop 
like) is more suited for DAGs generated by\ntools.\n\n\nJava API\n\n\nThe Java 
API is the most common way to create a streaming\napplication. It is meant for 
application developers who prefer to\nleverage the features of Java, and the 
ease of use and enhanced\nproductivity provided by IDEs like NetBeans or 
Eclipse. Using Java to\nspecify the application provides extra validation 
abilities of Java\ncompiler, such as compile time checks for type safety at the 
time of\nwriting the code. Later in this chapter you can read more abou
 t\nvalidation support in the platform.\n\n\nThe developer specifies the 
streaming application by implementing\nthe ApplicationFactory interface, which 
is how platform tools (CLI etc.)\nrecognize and instantiate applications. Here 
we show how to create a\nYahoo! Finance application that streams the last trade 
price of a ticker\nand computes the high and low price in every 1 min window. 
Run above\n test application\u00a0to execute the\nDAG in local mode within the 
IDE.\n\n\nLet us revisit how the Yahoo! Finance test application constructs the 
DAG:\n\n\npublic class Application implements StreamingApplication\n{\n\n  
...\n\n  @Override\n  public void populateDAG(DAG dag, Configuration conf)\n  
{\n    
dag.getAttributes().attr(DAG.STRAM_WINDOW_SIZE_MILLIS).set(streamingWindowSizeMilliSeconds);\n\n
    StockTickInput tick = getStockTickInputOperator(\nStockTickInput\n, dag);\n 
   SumKeyVal\nString, Long\n dailyVolume = 
getDailyVolumeOperator(\nDailyVolume\n, dag);\n    ConsolidatorKeyVal\n
 String,Double,Long,String,?,?\n quoteOperator = getQuoteOperator(\nQuote\n, 
dag);\n\n    RangeKeyVal\nString, Double\n highlow = 
getHighLowOperator(\nHighLow\n, dag, appWindowCountMinute);\n    
SumKeyVal\nString, Long\n minuteVolume = 
getMinuteVolumeOperator(\nMinuteVolume\n, dag, appWindowCountMinute);\n    
ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n chartOperator = 
getChartOperator(\nChart\n, dag);\n\n    SimpleMovingAverage\nString, Double\n 
priceSMA = getPriceSimpleMovingAverageOperator(\nPriceSMA\n, dag, 
appWindowCountSMA);\n\n    dag.addStream(\nprice\n, tick.price, 
quoteOperator.in1, highlow.data, priceSMA.data);\n    dag.addStream(\nvol\n, 
tick.volume, dailyVolume.data, minuteVolume.data);\n    dag.addStream(\ntime\n, 
tick.time, quoteOperator.in3);\n    dag.addStream(\ndaily_vol\n, 
dailyVolume.sum, quoteOperator.in2);\n\n    dag.addStream(\nquote_data\n, 
quoteOperator.out, getConsole(\nquoteConsole\n, dag, \nQUOTE\n));\n\n    
dag.addStream(\nhigh_low\n, highlow.range, ch
 artOperator.in1);\n    dag.addStream(\nvol_1min\n, minuteVolume.sum, 
chartOperator.in2);\n    dag.addStream(\nchart_data\n, chartOperator.out, 
getConsole(\nchartConsole\n, dag, \nCHART\n));\n\n    
dag.addStream(\nsma_price\n, priceSMA.doubleSMA, 
getConsole(\npriceSMAConsole\n, dag, \nPrice SMA\n));\n\n    return dag;\n  
}\n}\n\n\n\n\nJSON File DAG Specification\n\n\nIn addition to Java, you can 
also specify the DAG using JSON, provided the operators in the DAG are present 
in the dependency jars. Create src/main/resources/app directory under your app 
package project, and put your JSON files there. This is the specification of a 
JSON file that specifies an application.\n\n\nCreate a json file under 
src/main/resources/app, For example \nmyApplication.json\n\n\n{\n  
\ndescription\n: \n{application description}\n,\n  \noperators\n: [\n    {\n    
  \nname\n: \n{operator name}\n,\n      \nclass\n: \n{fully qualified class 
name of the operator}\n,\n      \nproperties\n: {\n        \n{proper
 ty key}\n: \n{property value}\n,\n        ...\n      }\n    }, ...\n  ],\n  
\nstreams\n: [\n    {\n      \nname\n: \n{stream name}\n,\n      \nsource\n: 
{\n        \noperatorName\n: \n{source operator name}\n,\n        \nportName\n: 
\n{source operator output port name}\n\n      }\n      \nsinks\n: [\n        
{\n          \noperatorName\n: \n{sink operator name}\n,\n          
\nportName\n: \n{sink operator input port name}\n\n        }, ...\n      ]\n    
}, ...\n  ]\n}\n\n\n\n\n\n\n\nThe name of the JSON file is taken as the name of 
the application.\n\n\nThe \ndescription\n field is the description of the 
application and is optional.\n\n\nThe \noperators\n field is the list of 
operators the application has. You can specifiy the name, the Java class, and 
the properties of each operator here.\n\n\nThe \nstreams\n field is the list of 
streams that connects the operators together to form the DAG. Each stream 
consists of the stream name, the operator and port that it connects from, and th
 e list of operators and ports that it connects to. Note that you can connect 
from \none\n output port of an operator to \nmultiple\n different input ports 
of different operators.\n\n\n\n\nIn Apex Malhar, there is an \nexample\n in the 
Pi Demo doing just that.\n\n\nProperties File DAG Specification\n\n\nThe 
platform also supports specification of a DAG via a properties\nfile. The aim 
here to make it easy for tools to create and run an\napplication. This method 
of specification does not have the Java\ncompiler support of compile time 
check, but since these applications\nwould be created by software, they should 
be correct by construction.\nThe syntax is derived from Hadoop properties and 
should be easy for\nfolks who are used to creating software that integrated 
with\nHadoop.\n\n\nUnder the src/main/resources/app directory (create if it 
doesn't exist), create a properties file.\nFor example 
\nmyApplication.properties\n\n\n# input operator that reads from a 
file\ndt.operator.inputOp.cl
 
assname=com.acme.SampleInputOperator\ndt.operator.inputOp.fileName=somefile.txt\n\n#
 output operator that writes to the 
console\ndt.operator.outputOp.classname=com.acme.ConsoleOutputOperator\n\n# 
stream connecting both 
operators\ndt.stream.inputStream.source=inputOp.outputPort\ndt.stream.inputStream.sinks=outputOp.inputPort\n\n\n\n\nAbove
 snippet is intended to convey the basic idea of specifying\nthe DAG using 
properties file. Operators would come from a predefined\nlibrary and referenced 
in the specification by class name and port names\n(obtained from the library 
providers documentation or runtime\nintrospection by tools). For those 
interested in details, see later\nsections and refer to the  Operation 
and\nInstallation Guide\u00a0mentioned above.\n\n\nAttributes\n\n\nAttributes 
impact the runtime behavior of the application. They do\nnot impact the 
functionality. An example of an attribute is application\nname. Setting it 
changes the application name. Another example is\nstreami
 ng window size. Setting it changes the streaming window size from\nthe default 
value to the specified value. Users cannot add new\nattributes, they can only 
choose from the ones that come packaged and\npre-supported by the platform. 
Details of attributes are covered in the\n Operation and 
Installation\nGuide.\n\n\nOperators\n\n\nOperators\u00a0are basic compute 
units.\nOperators process each incoming tuple and emit zero or more tuples 
on\noutput ports as per the business logic. The data flow, connectivity,\nfault 
tolerance (node outage), etc. is taken care of by the platform. As\nan operator 
developer, all that is needed is to figure out what to do\nwith the incoming 
tuple and when (and which output port) to send out a\nparticular output tuple. 
Correctly designed operators will most likely\nget reused. Operator design 
needs care and foresight. For details, refer\nto the  \nOperator Developer 
Guide\n. As an application developer you need to connect operators\nin a way 
that it impleme
 nts your business logic. You may also require\noperator customization for 
functionality and use attributes for\nperformance/scalability etc.\n\n\nAll 
operators process tuples asynchronously in a distributed\ncluster. An operator 
cannot assume or predict the exact time a tuple\nthat it emitted will get 
consumed by a downstream operator. An operator\nalso cannot predict the exact 
time when a tuple arrives from an upstream\noperator. The only guarantee is 
that the upstream operators are\nprocessing the current or a future window, 
i.e. the windowId of upstream\noperator is equals or exceeds its own windowId. 
Conversely the windowId\nof a downstream operator is less than or equals its 
own windowId. The\nend of a window operation, i.e. the API call to endWindow on 
an operator\nrequires that all upstream operators have finished processing 
this\nwindow. This means that completion of processing a window propagates 
in\na blocking fashion through an operator. Later sections provides more\ndeta
 ils on streams and data flow of tuples.\n\n\nEach operator has a unique name 
within the DAG as provided by the\nuser. This is the name of the operator in 
the logical plan. The name of\nthe operator in the physical plan is an integer 
assigned to it by STRAM.\nThese integers are use the sequence from 1 to N, 
where N is total number\nof physically unique operators in the DAG. 
\u00a0Following the same rule,\neach partitioned instance of a logical operator 
has its own integer as\nan id. This id along with the Hadoop container name 
uniquely identifies\nthe operator in the execution plan of the DAG. The logical 
names and the\nphysical names are required for web service support. Operators 
can be\naccessed via both names. These same names are used while 
interacting\nwith  dtcli\u00a0to access an operator.\nIdeally these names 
should be self-descriptive. For example in Figure 1,\nthe node named 
\u201cDaily volume\u201d has a physical identifier of 2.\n\n\nOperator 
Interface\n\n\nOperator inte
 rface in a DAG consists of ports,\u00a0properties,\u00a0and 
attributes.\nOperators interact with other components of the DAG via ports. 
Functional behavior of the operators\ncan be customized via parameters. Run 
time performance and physical\ninstantiation is controlled by attributes. Ports 
and parameters are\nfields (variables) of the Operator class/object, while 
attributes are\nmeta information that is attached to the operator object via 
an\nAttributeMap. An operator must have at least one port. Properties 
are\noptional. Attributes are provided by the platform and always have 
a\ndefault value that enables normal functioning of 
operators.\n\n\nPorts\n\n\nPorts are connection points by which an operator 
receives and\nemits tuples. These should be transient objects instantiated in 
the\noperator object, that implement particular interfaces. Ports should 
be\ntransient as they contain no state. They have a pre-defined schema and\ncan 
only be connected to other ports with the same schema
 . An input port\nneeds to implement the interface  
Operator.InputPort\u00a0and\ninterface Sink. A default\nimplementation of these 
is provided by the abstract class DefaultInputPort. An output port needs 
to\nimplement the interface  Operator.OutputPort. A default implementation\nof 
this is provided by the concrete class DefaultOutputPort. These two are a quick 
way to\nimplement the above interfaces, but operator developers have the 
option\nof providing their own implementations.\n\n\nHere are examples of an 
input and an output port from the 
operator\nSum.\n\n\n@InputPortFieldAnnotation(name = \ndata\n)\npublic final 
transient DefaultInputPort\nV\n data = new DefaultInputPort\nV\n() {\n  
@Override\n  public void process(V tuple)\n  {\n    ...\n  
}\n}\n@OutputPortFieldAnnotation(optional=true)\npublic final transient 
DefaultOutputPort\nV\n sum = new DefaultOutputPort\nV\n(){ \u2026 
};\n\n\n\n\nThe process call is in the Sink interface. An emit on an 
output\nport is done via emit(tuple
 ) call. For the above example it would be\nsum.emit(t), where the type of t is 
the generic parameter V.\n\n\nThere is no limit on how many ports an operator 
can have. However\nany operator must have at least one port. An operator with 
only one port\nis called an Input Adapter if it has no input port and an Output 
Adapter\nif it has no output port. These are special operators needed to 
get/read\ndata from outside system/source into the application, or push/write 
data\ninto an outside system/sink. These could be in Hadoop or outside 
of\nHadoop. These two operators are in essence gateways for the 
streaming\napplication to communicate with systems outside the 
application.\n\n\nPort connectivity can be validated during compile time by 
adding\nPortFieldAnnotations shown above. By default all ports have to 
be\nconnected, to allow a port to go unconnected, you need to 
add\n\u201coptional=true\u201d to the annotation.\n\n\nAttributes can be 
specified for ports that affect the runtime\nbehavi
 or. An example of an attribute is parallel partition that specifes\na parallel 
computation flow per partition. It is described in detail in\nthe Parallel 
Partitions section. Another example is queue capacity that specifies the buffer 
size for the\nport. Details of attributes are covered in  Operation and 
Installation Guide.\n\n\nProperties\n\n\nProperties are the abstractions by 
which functional behavior of an\noperator can be customized. They should be 
non-transient objects\ninstantiated in the operator object. They need to be 
non-transient since\nthey are part of the operator state and re-construction of 
the operator\nobject from its checkpointed state must restore the operator to 
the\ndesired state. Properties are optional, i.e. an operator may or may 
not\nhave properties; they are part of user code and their values are 
not\ninterpreted by the platform in any way.\n\n\nAll non-serializable objects 
should be declared transient.\nExamples include sockets, session information, 
etc. 
 These objects should\nbe initialized during setup call, which is called every 
time the\noperator is initialized.\n\n\nAttributes\n\n\nAttributes are values 
assigned to the operators that impact\nrun-time. This includes things like the 
number of partitions, at most\nonce or at least once or exactly once recovery 
modes, etc. Attributes do\nnot impact functionality of the operator. Users can 
change certain\nattributes in runtime. Users cannot add attributes to 
operators; they\nare pre-defined by the platform. They are interpreted by the 
platform\nand thus cannot be defined in user created code (like 
properties).\nDetails of attributes are covered in  \nConfiguration 
Guide\n.\n\n\nOperator State\n\n\nThe state of an operator is defined as the 
data that it transfers\nfrom one window to a future window. Since the computing 
model of the\nplatform is to treat windows like micro-batches, the operator 
state can\nbe checkpointed every Nth window, or every T units of time, where T 
is significan
 tly greater\nthan the streaming window. \u00a0When an operator is 
checkpointed, the entire\nobject is written to HDFS. \u00a0The larger the 
amount of state in an\noperator, the longer it takes to recover from a failure. 
A stateless\noperator can recover much quicker than a stateful one. The 
needed\nwindows are preserved by the upstream buffer server and are used 
to\nrecompute the lost windows, and also rebuild the buffer server in 
the\ncurrent container.\n\n\nThe distinction between Stateless and Stateful is 
based solely on\nthe need to transfer data in the operator from one window to 
the next.\nThe state of an operator is independent of the number of 
ports.\n\n\nStateless\n\n\nA Stateless operator is defined as one where no data 
is needed to\nbe kept at the end of every window. This means that all the 
computations\nof a window can be derived from all the tuples the operator 
receives\nwithin that window. This guarantees that the output of any window can 
be\nreconstructed by simply r
 eplaying the tuples that arrived in that\nwindow. Stateless operators are more 
efficient in terms of fault\ntolerance, and cost to achieve 
SLA.\n\n\nStateful\n\n\nA Stateful operator is defined as one where data is 
needed to be\nstored at the end of a window for computations occurring in 
later\nwindow; a common example is the computation of a sum of values in 
the\ninput tuples.\n\n\nOperator API\n\n\nThe Operator API consists of methods 
that operator developers may\nneed to override. In this section we will discuss 
the Operator APIs from\nthe point of view of an application developer. 
Knowledge of how an\noperator works internally is critical for writing an 
application. Those\ninterested in the details should refer to  Malhar Operator 
Developer Guide.\n\n\nThe APIs are available in three modes, namely Single 
Streaming\nWindow, Sliding Application Window, and Aggregate Application 
Window.\nThese are not mutually exclusive, i.e. an operator can use 
single\nstreaming window as well as 
 sliding application window. A physical\ninstance of an operator is always 
processing tuples from a single\nwindow. The processing of tuples is guaranteed 
to be sequential, no\nmatter which input port the tuples arrive on.\n\n\nIn the 
later part of this section we will evaluate three common\nuses of streaming 
windows by applications. They have different\ncharacteristics and implications 
on optimization and recovery mechanisms\n(i.e. algorithm used to recover a node 
after outage) as discussed later\nin the section.\n\n\nStreaming 
Window\n\n\nStreaming window is atomic micro-batch computation period. The 
API\nmethods relating to a streaming window are as follows\n\n\npublic void 
process(\ntuple_type\n tuple) // Called on the input port on which the tuple 
arrives\npublic void beginWindow(long windowId) // Called at the start of the 
window as soon as the first begin_window tuple arrives\npublic void endWindow() 
// Called at the end of the window after end_window tuples arrive on all inpu
 t ports\npublic void setup(OperatorContext context) // Called once during 
initialization of the operator\npublic void teardown() // Called once when the 
operator is being shutdown\n\n\n\n\nA tuple can be emitted in any of the three 
streaming run-time\ncalls, namely beginWindow, process, and endWindow but not 
in setup or\nteardown.\n\n\nAggregate Application Window\n\n\nAn operator with 
an aggregate window is stateful within the\napplication window timeframe and 
possibly stateless at the end of that\napplication window. An size of an 
aggregate application window is an\noperator attribute and is defined as a 
multiple of the streaming window\nsize. The platform recognizes this attribute 
and optimizes the operator.\nThe beginWindow, and endWindow calls are not 
invoked for those streaming\nwindows that do not align with the application 
window. For example in\ncase of streaming window of 0.5 second and application 
window of 5\nminute, an application window spans 600 streaming windows (5*6
 0*2 =\n600). At the start of the sequence of these 600 atomic 
streaming\nwindows, a beginWindow gets invoked, and at the end of these 
600\nstreaming windows an endWindow gets invoked. All the 
intermediate\nstreaming windows do not invoke beginWindow or endWindow. 
Bookkeeping,\nnode recovery, stats, UI, etc. continue to work off streaming 
windows.\nFor example if operators are being checkpointed say on an average 
every\n30th window, then the above application window would have about 
20\ncheckpoints.\n\n\nSliding Application Window\n\n\nA sliding window is 
computations that requires previous N\nstreaming windows. After each streaming 
window the Nth past window is\ndropped and the new window is added to the 
computation. An operator with\nsliding window is a stateful operator at end of 
any window. The sliding\nwindow period is an attribute and is a multiple of 
streaming window. The\nplatform recognizes this attribute and leverages it 
during bookkeeping.\nA sliding aggregate window with 
 tolerance to data loss does not have a\nvery high bookkeeping cost. The cost 
of all three recovery mechanisms,\n at most once\u00a0(data loss tolerant),\nat 
least once\u00a0(data loss\nintolerant), and exactly once\u00a0(data\nloss 
intolerant and no extra computations) is same as recovery\nmechanisms based on 
streaming window. STRAM is not able to leverage this\noperator for any extra 
optimization.\n\n\nSingle vs Multi-Input Operator\n\n\nA single-input operator 
by definition has a single upstream\noperator, since there can only be one 
writing port for a stream. \u00a0If an\noperator has a single upstream 
operator, then the beginWindow on the\nupstream also blocks the beginWindow of 
the single-input operator. For\nan operator to start processing any window at 
least one upstream\noperator has to start processing that window. A multi-input 
operator\nreads from more than one upstream ports. Such an operator would 
start\nprocessing as soon as the first begin_window event arrives. Howeve
 r the\nwindow would not close (i.e. invoke endWindow) till 

<TRUNCATED>

Reply via email to