http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/2ea69c2b/content/docs/apex/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/content/docs/apex/mkdocs/search_index.json 
b/content/docs/apex/mkdocs/search_index.json
deleted file mode 100644
index 34dd42f..0000000
--- a/content/docs/apex/mkdocs/search_index.json
+++ /dev/null
@@ -1,834 +0,0 @@
-{
-    "docs": [
-        {
-            "location": "/", 
-            "text": "Apache Apex (Incubating)\n\n\nApex is a Hadoop YARN 
native big data processing platform, enabling real time stream as well as batch 
processing for your big data.  Apex provides the following 
benefits:\n\n\n\n\nHigh scalability and performance\n\n\nFault tolerance and 
state management\n\n\nHadoop-native YARN \n HDFS implementation\n\n\nEvent 
processing guarantees\n\n\nSeparation of functional and operational 
concerns\n\n\nSimple API supports generic Java code\n\n\n\n\nPlatform has been 
demonstated to scale linearly across Hadoop clusters under extreme loads of 
billions of events per second.  Hardware and process failures are quickly 
recovered with HDFS-backed checkpointing and automatic operator recovery, 
preserving application state and resuming execution in seconds.  Functional and 
operational specifications are separated.  Apex provides a simple API, which 
enables users to write generic, reusable code.  The code is dropped in as-is 
and platform automatically h
 andles the various operational concerns, such as state management, fault 
tolerance, scalability, security, metrics, etc.  This frees users to focus on 
functional development, and lets platform provide operability support.\n\n\nThe 
core Apex platform is supplemented by Malhar, a library of connector and logic 
functions, enabling rapid application development.  These operators and modules 
provide access to HDFS, S3, NFS, FTP, and other file systems; Kafka, ActiveMQ, 
RabbitMQ, JMS, and other message systems; MySql, Cassandra, MongoDB, Redis, 
HBase, CouchDB, generic JDBC, and other database connectors.  In addition to 
the operators, the library contains a number of demos applications, 
demonstrating operator features and capabilities.  To see the full list of 
available operators and related documentation, visit \nApex Malhar on 
Github\n\n\nFor additional information visit \nApache Apex (incubating)\n.", 
-            "title": "Apache Apex"
-        }, 
-        {
-            "location": "/#apache-apex-incubating", 
-            "text": "Apex is a Hadoop YARN native big data processing 
platform, enabling real time stream as well as batch processing for your big 
data.  Apex provides the following benefits:   High scalability and performance 
 Fault tolerance and state management  Hadoop-native YARN   HDFS implementation 
 Event processing guarantees  Separation of functional and operational concerns 
 Simple API supports generic Java code   Platform has been demonstated to scale 
linearly across Hadoop clusters under extreme loads of billions of events per 
second.  Hardware and process failures are quickly recovered with HDFS-backed 
checkpointing and automatic operator recovery, preserving application state and 
resuming execution in seconds.  Functional and operational specifications are 
separated.  Apex provides a simple API, which enables users to write generic, 
reusable code.  The code is dropped in as-is and platform automatically handles 
the various operational concerns, such as state management
 , fault tolerance, scalability, security, metrics, etc.  This frees users to 
focus on functional development, and lets platform provide operability support. 
 The core Apex platform is supplemented by Malhar, a library of connector and 
logic functions, enabling rapid application development.  These operators and 
modules provide access to HDFS, S3, NFS, FTP, and other file systems; Kafka, 
ActiveMQ, RabbitMQ, JMS, and other message systems; MySql, Cassandra, MongoDB, 
Redis, HBase, CouchDB, generic JDBC, and other database connectors.  In 
addition to the operators, the library contains a number of demos applications, 
demonstrating operator features and capabilities.  To see the full list of 
available operators and related documentation, visit  Apex Malhar on Github  
For additional information visit  Apache Apex (incubating) .", 
-            "title": "Apache Apex (Incubating)"
-        }, 
-        {
-            "location": "/apex_development_setup/", 
-            "text": "Apache Apex Development Environment Setup\n\n\nThis 
document discusses the steps needed for setting up a development environment 
for creating applications that run on the Apache Apex 
platform.\n\n\nDevelopment Tools\n\n\nThere are a few tools that will be 
helpful when developing Apache Apex applications, including:\n\n\n\n\n\n\ngit\n 
- A revision control system (version 1.7.1 or later). There are multiple git 
clients available for Windows (\nhttp://git-scm.com/download/win\n for 
example), so download and install a client of your choice.\n\n\n\n\n\n\njava 
JDK\n (not JRE) - Includes the Java Runtime Environment as well as the Java 
compiler and a variety of tools (version 1.7.0_79 or later). Can be downloaded 
from the Oracle website.\n\n\n\n\n\n\nmaven\n - Apache Maven is a build system 
for Java projects (version 3.0.5 or later). It can be downloaded from 
\nhttps://maven.apache.org/download.cgi\n.\n\n\n\n\n\n\nIDE\n (Optional) - If 
you prefer to use an IDE (Integra
 ted Development Environment) such as \nNetBeans\n, \nEclipse\n or 
\nIntelliJ\n, install that as well.\n\n\n\n\n\n\nAfter installing these tools, 
make sure that the directories containing the executable files are in your PATH 
environment variable.\n\n\n\n\nWindows\n - Open a console window and enter the 
command \necho %PATH%\n to see the value of the \nPATH\n variable and verify 
that the above directories for Java, git, and maven executables are present.  
JDK executables like \njava\n and \njavac\n, the directory might be something 
like \nC:\\\\Program Files\\\\Java\\\\jdk1.7.0\\_80\\\\bin\n; for \ngit\n it 
might be \nC:\\\\Program Files\\\\Git\\\\bin\n; and for maven it might be 
\nC:\\\\Users\\\\user\\\\Software\\\\apache-maven-3.3.3\\\\bin\n.  If not, you 
can change its value clicking on the button at \nControl Panel\n \n \nAdvanced 
System Settings\n \n \nAdvanced tab\n \n \nEnvironment Variables\n.\n\n\nLinux 
and Mac\n - Open a console/terminal window and enter the command \necho 
 $PATH\n to see the value of the \nPATH\n variable and verify that the above 
directories for Java, git, and maven executables are present.  If not, make 
sure software is downloaded and installed, and optionally PATH reference is 
added and exported  in a \n~/.profile\n or \n~/.bash_profile\n.  For example to 
add maven located in \n/sfw/maven/apache-maven-3.3.3\n to PATH add the line: 
\nexport PATH=$PATH:/sfw/maven/apache-maven-3.3.3/bin\n\n\n\n\nConfirm by 
running the following commands and comparing with output that show in the table 
below:\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nCommand\n\n\nOutput\n\n\n\n\n\n\njavac 
-version\n\n\njavac 1.7.0_80\n\n\n\n\n\n\njava -version\n\n\njava version 
\n1.7.0_80\n\n\nJava(TM) SE Runtime Environment (build 1.7.0_80-b15)\n\n\nJava 
HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)\n\n\n\n\n\n\ngit 
--version\n\n\ngit version 2.6.1.windows.1\n\n\n\n\n\n\nmvn 
--version\n\n\nApache Maven 3.3.3 (7994120775791599e205a5524ec3e0dfe41d4a06; 
2015-04-22T06
 :57:37-05:00)\n\n\n...\n\n\n\n\n\n\n\n\n\n\n\nCreating New Apex 
Project\n\n\nAfter development tools are configured, you can now use the maven 
archetype to create a basic Apache Apex project.  \nNote:\n When executing the 
commands below, replace \n3.3.0-incubating\n by \nlatest available version\n of 
Apache Apex.\n\n\n\n\n\n\nWindows\n - Create a new Windows command file called 
\nnewapp.cmd\n by copying the lines below, and execute it.  When you run this 
file, the properties will be displayed and you will be prompted with \nY: :\n; 
just press \nEnter\n to complete the project generation.  The caret (^) at the 
end of some lines indicates that a continuation line follows. \n\n\n@echo 
off\n@rem Script for creating a new application\nsetlocal\nmvn 
archetype:generate ^\n -DarchetypeGroupId=org.apache.apex ^\n 
-DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.3.0-incubating 
^\n -DgroupId=com.example -Dpackage=com.example.myapexapp 
-DartifactId=myapexapp ^\n -Dversion=1.0-SNAPS
 HOT\nendlocal\n\n\n\n\n\n\n\nLinux\n - Execute the lines below in a terminal 
window.  New project will be created in the curent working directory.  The 
backslash (\\) at the end of the lines indicates continuation.\n\n\nmvn 
archetype:generate \\\n -DarchetypeGroupId=org.apache.apex \\\n 
-DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.2.0-incubating 
\\\n -DgroupId=com.example -Dpackage=com.example.myapexapp 
-DartifactId=myapexapp \\\n -Dversion=1.0-SNAPSHOT\n\n\n\n\n\n\n\nWhen the run 
completes successfully, you should see a new directory named \nmyapexapp\n 
containing a maven project for building a basic Apache Apex application. It 
includes 3 source files:\nApplication.java\n,  \nRandomNumberGenerator.java\n 
and \nApplicationTest.java\n. You can now build the application by stepping 
into the new directory and running the maven package command:\n\n\ncd 
myapexapp\nmvn clean package -DskipTests\n\n\n\nThe build should create the 
application package file \nmyapexapp/target
 /myapexapp-1.0-SNAPSHOT.apa\n. This application package can then be used to 
launch example application via \ndtCli\n, or other visual management tools.  
When running, this application will generate a stream of random numbers and 
print them out, each prefixed by the string \nhello world:\n.\n\n\nBuilding 
Apex Demos\n\n\nIf you want to see more substantial Apex demo applications and 
the associated source code, you can follow these simple steps to check out and 
build them.\n\n\n\n\n\n\nCheck out the source code repositories:\n\n\ngit clone 
https://github.com/apache/incubator-apex-core\ngit clone 
https://github.com/apache/incubator-apex-malhar\n\n\n\n\n\n\n\nSwitch to the 
appropriate release branch and build each repository:\n\n\ncd 
incubator-apex-core\nmvn clean install -DskipTests\n\ncd 
incubator-apex-malhar\nmvn clean install -DskipTests\n\n\n\n\n\n\n\nThe 
\ninstall\n argument to the \nmvn\n command installs resources from each 
project to your local maven repository (typically \n.m2/
 repository\n under your home directory), and \nnot\n to the system 
directories, so Administrator privileges are not required. The  \n-DskipTests\n 
argument skips running unit tests since they take a long time. If this is a 
first-time installation, it might take several minutes to complete because 
maven will download a number of associated plugins.\n\n\nAfter the build 
completes, you should see the demo application package files in the target 
directory under each demo subdirectory in 
\nincubator-apex-malhar/demos\n.\n\n\nSandbox\n\n\nTo jump start development 
with an Apache Hadoop single node cluster, \nDataTorrent Sandbox\n powered by 
VirtualBox is available on Windows, Linux, or Mac platforms.  The sandbox is 
configured by default to run with 6GB RAM; if your development machine has 16GB 
or more, you can increase the sandbox RAM to 8GB or more using the VirtualBox 
console.  This will yield better performance and support larger applications.  
The advantage of developing in the sandb
 ox is that most of the tools (e.g. \njdk\n, \ngit\n, \nmaven\n), Hadoop YARN 
and HDFS, and a distribution of Apache Apex and DataTorrent RTS are 
pre-installed.  The disadvantage is that the sandbox is a memory-limited 
environment, and requires settings changes and restarts to adjust memory 
available for development and testing.", 
-            "title": "Development Setup"
-        }, 
-        {
-            "location": 
"/apex_development_setup/#apache-apex-development-environment-setup", 
-            "text": "This document discusses the steps needed for setting up a 
development environment for creating applications that run on the Apache Apex 
platform.", 
-            "title": "Apache Apex Development Environment Setup"
-        }, 
-        {
-            "location": "/apex_development_setup/#development-tools", 
-            "text": "There are a few tools that will be helpful when 
developing Apache Apex applications, including:    git  - A revision control 
system (version 1.7.1 or later). There are multiple git clients available for 
Windows ( http://git-scm.com/download/win  for example), so download and 
install a client of your choice.    java JDK  (not JRE) - Includes the Java 
Runtime Environment as well as the Java compiler and a variety of tools 
(version 1.7.0_79 or later). Can be downloaded from the Oracle website.    
maven  - Apache Maven is a build system for Java projects (version 3.0.5 or 
later). It can be downloaded from  https://maven.apache.org/download.cgi .    
IDE  (Optional) - If you prefer to use an IDE (Integrated Development 
Environment) such as  NetBeans ,  Eclipse  or  IntelliJ , install that as well. 
   After installing these tools, make sure that the directories containing the 
executable files are in your PATH environment variable.   Windows  - Open a 
console window and
  enter the command  echo %PATH%  to see the value of the  PATH  variable and 
verify that the above directories for Java, git, and maven executables are 
present.  JDK executables like  java  and  javac , the directory might be 
something like  C:\\\\Program Files\\\\Java\\\\jdk1.7.0\\_80\\\\bin ; for  git  
it might be  C:\\\\Program Files\\\\Git\\\\bin ; and for maven it might be  
C:\\\\Users\\\\user\\\\Software\\\\apache-maven-3.3.3\\\\bin .  If not, you can 
change its value clicking on the button at  Control Panel     Advanced System 
Settings     Advanced tab     Environment Variables .  Linux and Mac  - Open a 
console/terminal window and enter the command  echo $PATH  to see the value of 
the  PATH  variable and verify that the above directories for Java, git, and 
maven executables are present.  If not, make sure software is downloaded and 
installed, and optionally PATH reference is added and exported  in a  
~/.profile  or  ~/.bash_profile .  For example to add maven located in  /sf
 w/maven/apache-maven-3.3.3  to PATH add the line:  export 
PATH=$PATH:/sfw/maven/apache-maven-3.3.3/bin   Confirm by running the following 
commands and comparing with output that show in the table below:         
Command  Output    javac -version  javac 1.7.0_80    java -version  java 
version  1.7.0_80  Java(TM) SE Runtime Environment (build 1.7.0_80-b15)  Java 
HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)    git --version  
git version 2.6.1.windows.1    mvn --version  Apache Maven 3.3.3 
(7994120775791599e205a5524ec3e0dfe41d4a06; 2015-04-22T06:57:37-05:00)  ...", 
-            "title": "Development Tools"
-        }, 
-        {
-            "location": "/apex_development_setup/#creating-new-apex-project", 
-            "text": "After development tools are configured, you can now use 
the maven archetype to create a basic Apache Apex project.   Note:  When 
executing the commands below, replace  3.3.0-incubating  by  latest available 
version  of Apache Apex.    Windows  - Create a new Windows command file called 
 newapp.cmd  by copying the lines below, and execute it.  When you run this 
file, the properties will be displayed and you will be prompted with  Y: : ; 
just press  Enter  to complete the project generation.  The caret (^) at the 
end of some lines indicates that a continuation line follows.   @echo off\n@rem 
Script for creating a new application\nsetlocal\nmvn archetype:generate ^\n 
-DarchetypeGroupId=org.apache.apex ^\n -DarchetypeArtifactId=apex-app-archetype 
-DarchetypeVersion=3.3.0-incubating ^\n -DgroupId=com.example 
-Dpackage=com.example.myapexapp -DartifactId=myapexapp ^\n 
-Dversion=1.0-SNAPSHOT\nendlocal    Linux  - Execute the lines below in a 
terminal window.  New projec
 t will be created in the curent working directory.  The backslash (\\) at the 
end of the lines indicates continuation.  mvn archetype:generate \\\n 
-DarchetypeGroupId=org.apache.apex \\\n 
-DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.2.0-incubating 
\\\n -DgroupId=com.example -Dpackage=com.example.myapexapp 
-DartifactId=myapexapp \\\n -Dversion=1.0-SNAPSHOT    When the run completes 
successfully, you should see a new directory named  myapexapp  containing a 
maven project for building a basic Apache Apex application. It includes 3 
source files: Application.java ,   RandomNumberGenerator.java  and  
ApplicationTest.java . You can now build the application by stepping into the 
new directory and running the maven package command:  cd myapexapp\nmvn clean 
package -DskipTests  The build should create the application package file  
myapexapp/target/myapexapp-1.0-SNAPSHOT.apa . This application package can then 
be used to launch example application via  dtCli , or other visual 
 management tools.  When running, this application will generate a stream of 
random numbers and print them out, each prefixed by the string  hello world: 
.", 
-            "title": "Creating New Apex Project"
-        }, 
-        {
-            "location": "/apex_development_setup/#building-apex-demos", 
-            "text": "If you want to see more substantial Apex demo 
applications and the associated source code, you can follow these simple steps 
to check out and build them.    Check out the source code repositories:  git 
clone https://github.com/apache/incubator-apex-core\ngit clone 
https://github.com/apache/incubator-apex-malhar    Switch to the appropriate 
release branch and build each repository:  cd incubator-apex-core\nmvn clean 
install -DskipTests\n\ncd incubator-apex-malhar\nmvn clean install -DskipTests  
  The  install  argument to the  mvn  command installs resources from each 
project to your local maven repository (typically  .m2/repository  under your 
home directory), and  not  to the system directories, so Administrator 
privileges are not required. The   -DskipTests  argument skips running unit 
tests since they take a long time. If this is a first-time installation, it 
might take several minutes to complete because maven will download a number of 
associated plugins.  A
 fter the build completes, you should see the demo application package files in 
the target directory under each demo subdirectory in  
incubator-apex-malhar/demos .", 
-            "title": "Building Apex Demos"
-        }, 
-        {
-            "location": "/apex_development_setup/#sandbox", 
-            "text": "To jump start development with an Apache Hadoop single 
node cluster,  DataTorrent Sandbox  powered by VirtualBox is available on 
Windows, Linux, or Mac platforms.  The sandbox is configured by default to run 
with 6GB RAM; if your development machine has 16GB or more, you can increase 
the sandbox RAM to 8GB or more using the VirtualBox console.  This will yield 
better performance and support larger applications.  The advantage of 
developing in the sandbox is that most of the tools (e.g.  jdk ,  git ,  maven 
), Hadoop YARN and HDFS, and a distribution of Apache Apex and DataTorrent RTS 
are pre-installed.  The disadvantage is that the sandbox is a memory-limited 
environment, and requires settings changes and restarts to adjust memory 
available for development and testing.", 
-            "title": "Sandbox"
-        }, 
-        {
-            "location": "/application_development/", 
-            "text": "Application Developer Guide\n\n\nThe Apex platform is 
designed to process massive amounts of\nreal-time events natively in Hadoop.  
It runs as a YARN (Hadoop 2.x) \napplication and leverages Hadoop as a 
distributed operating\nsystem.  All the basic distributed operating system 
capabilities of\nHadoop like resource management (YARN), distributed file 
system (HDFS),\nmulti-tenancy, security, fault-tolerance, and scalability are 
supported natively \nin all the Apex applications. \u00a0The platform handles 
all the details of the application \nexecution, including dynamic scaling, 
state checkpointing and recovery, event \nprocessing guarantees, etc. allowing 
you to focus on writing your application logic without\nmixing operational and 
functional concerns.\n\n\nIn the platform, building a streaming application can 
be extremely\neasy and intuitive. \u00a0The application is represented as a 
Directed\nAcyclic Graph (DAG) of computation units called \nOperators\n interco
 nnected\nby the data-flow edges called  \nStreams\n.\u00a0The operators 
process input\nstreams and produce output streams. A library of common 
operators is\nprovided to enable quick application development. \u00a0In case 
the desired\nprocessing is not available in the Operator Library, one can 
easily\nwrite a custom operator. We refer those interested in creating their 
own\noperators to the \nOperator Development Guide\n.\n\n\nRunning A Test 
Application\n\n\nIf you are starting with the Apex platform for the first 
time,\nit can be informative to launch an existing application and see it 
run.\nOne of the simplest examples provided in \nApex-Malhar repository\n is a 
Pi demo application,\nwhich computes the value of PI using random numbers.  
After \nsetting up development environment\n\nPi demo can be launched as 
follows:\n\n\n\n\nOpen up Apex Malhar files in your IDE (for example Eclipse, 
IntelliJ, NetBeans, etc)\n\n\nNavigate to 
\ndemos/pi/src/test/java/com/datatorrent/demos/Applicat
 ionTest.java\n\n\nRun the test for ApplicationTest.java\n\n\nView the output 
in system console\n\n\n\n\nCongratulations, you just ran your first real-time 
streaming demo :) \nThis demo is very simple and has four operators. The first 
operator\nemits random integers between 0 to 30, 000. The second operator 
receives\nthese coefficients and emits a hashmap with x and y values each time 
it\nreceives two values. The third operator takes these values and 
computes\nx**2+y**2. The last operator counts how many computed values 
from\nthe previous operator were less than or equal to 30, 000**2. 
Assuming\nthis count is N, then PI is computed as N/number of values 
received.\nHere is the code snippet for the PI application. This code populates 
the\nDAG. Do not worry about what each line does, we will cover these\nconcepts 
later in this document.\n\n\n// Generates random numbers\nRandomEventGenerator 
rand = dag.addOperator(\nrand\n, new 
RandomEventGenerator());\nrand.setMinvalue(0);\nrand.setMaxv
 alue(30000);\n\n// Generates a round robin HashMap of \nx\n and 
\ny\n\nRoundRobinHashMap\nString,Object\n rrhm = dag.addOperator(\nrrhm\n, new 
RoundRobinHashMap\nString, Object\n());\nrrhm.setKeys(new String[] { \nx\n, 
\ny\n });\n\n// Calculates pi from x and y\nJavaScriptOperator calc = 
dag.addOperator(\npicalc\n, new 
Script());\ncalc.setPassThru(false);\ncalc.put(\ni\n,0);\ncalc.put(\ncount\n,0);\ncalc.addSetupScript(\nfunction
 pi() { if (x*x+y*y \n= \n+maxValue*maxValue+\n) { i++; } count++; return i / 
count * 4; }\n);\ncalc.setInvoke(\npi\n);\ndag.addStream(\nrand_rrhm\n, 
rand.integer_data, rrhm.data);\ndag.addStream(\nrrhm_calc\n, rrhm.map, 
calc.inBindings);\n\n// puts results on system console\nConsoleOutputOperator 
console = dag.addOperator(\nconsole\n, new 
ConsoleOutputOperator());\ndag.addStream(\nrand_console\n,calc.result, 
console.input);\n\n\n\n\nYou can review the other demos and see what they do. 
The examples\ngiven in the Demos project cover various features of the pl
 atform and we\nstrongly encourage you to read these to familiarize yourself 
with the\nplatform. In the remaining part of this document we will go 
through\ndetails needed for you to develop and run streaming applications 
in\nMalhar.\n\n\nTest Application: Yahoo! Finance Quotes\n\n\nThe 
PI\u00a0application was to\nget you started. It is a basic application and does 
not fully illustrate\nthe features of the platform. For the purpose of 
describing concepts, we\nwill consider the test application shown in Figure 1. 
The application\ndownloads tick data from  \nYahoo! Finance\n \u00a0and 
computes the\nfollowing for four tickers, namely \nIBM\n,\n\nGOOG\n, 
\nYHOO\n.\n\n\n\n\nQuote: Consisting of last trade price, last trade time, 
and\n    total volume for the day\n\n\nPer-minute chart data: Highest trade 
price, lowest trade\n    price, and volume during that minute\n\n\nSimple 
Moving Average: trade price over 5 minutes\n\n\n\n\nTotal volume must ensure 
that all trade volume for that day is\
 nadded, i.e. data loss would result in wrong results. Charting data needs\nall 
the trades in the same minute to go to the same slot, and then on it\nstarts 
afresh, so again data loss would result in wrong results. The\naggregation for 
charting data is done over 1 minute. Simple moving\naverage computes the 
average price over a 5 minute sliding window; it\ntoo would produce wrong 
results if there is data loss. Figure 1 shows\nthe application with no 
partitioning.\n\n\n\n\nThe operator 
StockTickerInput:\u00a0StockTickerInput\n\u00a0\nis\nthe input operator that 
reads live data from Yahoo! Finance once per\ninterval (user configurable in 
milliseconds), and emits the price, the\nincremental volume, and the last trade 
time of each stock symbol, thus\nemulating real ticks from the exchange. 
\u00a0We utilize the Yahoo! Finance\nCSV web service interface. \u00a0For 
example:\n\n\n$ GET 
'http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO\nf=sl1vt1'\n\nIBM\n,203.966,1513041,\n
 
1:43pm\n\n\nGOOG\n,762.68,1879741,\n1:43pm\n\n\nAAPL\n,444.3385,11738366,\n1:43pm\n\n\nYHOO\n,19.3681,14707163,\n1:43pm\n\n\n\n\n\nAmong
 all the operators in Figure 1, StockTickerInput is the only\noperator that 
requires extra code because it contains a custom mechanism\nto get the input 
data. \u00a0Other operators are used unchanged from the\nMalhar 
library.\n\n\nHere is the class implementation for StockTickInput:\n\n\npackage 
com.datatorrent.demos.yahoofinance;\n\nimport 
au.com.bytecode.opencsv.CSVReader;\nimport 
com.datatorrent.annotation.OutputPortFieldAnnotation;\nimport 
com.datatorrent.api.Context.OperatorContext;\nimport 
com.datatorrent.api.DefaultOutputPort;\nimport 
com.datatorrent.api.InputOperator;\nimport 
com.datatorrent.lib.util.KeyValPair;\nimport java.io.IOException;\nimport 
java.io.InputStream;\nimport java.io.InputStreamReader;\nimport 
java.util.*;\nimport org.apache.commons.httpclient.HttpClient;\nimport 
org.apache.commons.httpclient.HttpStatus;\nimport org.apache.
 commons.httpclient.cookie.CookiePolicy;\nimport 
org.apache.commons.httpclient.methods.GetMethod;\nimport 
org.apache.commons.httpclient.params.DefaultHttpParams;\nimport 
org.slf4j.Logger;\nimport org.slf4j.LoggerFactory;\n\n/**\n * This operator 
sends price, volume and time into separate ports and calculates incremental 
volume.\n */\npublic class StockTickInput implements InputOperator\n{\n  
private static final Logger logger = 
LoggerFactory.getLogger(StockTickInput.class);\n  /**\n   * Timeout interval 
for reading from server. 0 or negative indicates no timeout.\n   */\n  public 
int readIntervalMillis = 500;\n  /**\n   * The URL of the web service resource 
for the POST request.\n   */\n  private String url;\n  public String[] 
symbols;\n  private transient HttpClient client;\n  private transient GetMethod 
method;\n  private HashMap\nString, Long\n lastVolume = new HashMap\nString, 
Long\n();\n  private boolean outputEvenIfZeroVolume = false;\n  /**\n   * The 
output port to emit price.
 \n   */\n  @OutputPortFieldAnnotation(optional = true)\n  public final 
transient DefaultOutputPort\nKeyValPair\nString, Double\n price = new 
DefaultOutputPort\nKeyValPair\nString, Double\n();\n  /**\n   * The output port 
to emit incremental volume.\n   */\n  @OutputPortFieldAnnotation(optional = 
true)\n  public final transient DefaultOutputPort\nKeyValPair\nString, Long\n 
volume = new DefaultOutputPort\nKeyValPair\nString, Long\n();\n  /**\n   * The 
output port to emit last traded time.\n   */\n  
@OutputPortFieldAnnotation(optional = true)\n  public final transient 
DefaultOutputPort\nKeyValPair\nString, String\n time = new 
DefaultOutputPort\nKeyValPair\nString, String\n();\n\n  /**\n   * Prepare URL 
from symbols and parameters. URL will be something like: 
http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO\nf=sl1vt1\n 
  *\n   * @return the URL\n   */\n  private String prepareURL()\n  {\n    
String str = \nhttp://download.finance.yahoo.com/d/quotes.csv?s=\n;\n    for 
 (int i = 0; i \n symbols.length; i++) {\n      if (i != 0) {\n        str += 
\n,\n;\n      }\n      str += symbols[i];\n    }\n    str += 
\nf=sl1vt1\ne=.csv\n;\n    return str;\n  }\n\n  @Override\n  public void 
setup(OperatorContext context)\n  {\n    url = prepareURL();\n    client = new 
HttpClient();\n    method = new GetMethod(url);\n    
DefaultHttpParams.getDefaultParams().setParameter(\nhttp.protocol.cookie-policy\n,
 CookiePolicy.BROWSER_COMPATIBILITY);\n  }\n\n  @Override\n  public void 
teardown()\n  {\n  }\n\n  @Override\n  public void emitTuples()\n  {\n\n    try 
{\n      int statusCode = client.executeMethod(method);\n      if (statusCode 
!= HttpStatus.SC_OK) {\n        System.err.println(\nMethod failed: \n + 
method.getStatusLine());\n      }\n      else {\n        InputStream istream = 
method.getResponseBodyAsStream();\n        // Process response\n        
InputStreamReader isr = new InputStreamReader(istream);\n        CSVReader 
reader = new CSVReader(isr);\n        Lis
 t\nString[]\n myEntries = reader.readAll();\n        for (String[] stringArr: 
myEntries) {\n          ArrayList\nString\n tuple = new 
ArrayList\nString\n(Arrays.asList(stringArr));\n          if (tuple.size() != 
4) {\n            return;\n          }\n          // input csv is 
\nSymbol\n,\nPrice\n,\nVolume\n,\nTime\n\n          String symbol = 
tuple.get(0);\n          double currentPrice = Double.valueOf(tuple.get(1));\n  
        long currentVolume = Long.valueOf(tuple.get(2));\n          String 
timeStamp = tuple.get(3);\n          long vol = currentVolume;\n          // 
Sends total volume in first tick, and incremental volume afterwards.\n          
if (lastVolume.containsKey(symbol)) {\n            vol -= 
lastVolume.get(symbol);\n          }\n\n          if (vol \n 0 || 
outputEvenIfZeroVolume) {\n            price.emit(new KeyValPair\nString, 
Double\n(symbol, currentPrice));\n            volume.emit(new 
KeyValPair\nString, Long\n(symbol, vol));\n            time.emit(new KeyValPair
 \nString, String\n(symbol, timeStamp));\n            lastVolume.put(symbol, 
currentVolume);\n          }\n        }\n      }\n      
Thread.sleep(readIntervalMillis);\n    }\n    catch (InterruptedException ex) 
{\n      logger.debug(ex.toString());\n    }\n    catch (IOException ex) {\n    
  logger.debug(ex.toString());\n    }\n  }\n\n  @Override\n  public void 
beginWindow(long windowId)\n  {\n  }\n\n  @Override\n  public void 
endWindow()\n  {\n  }\n\n  public void setOutputEvenIfZeroVolume(boolean 
outputEvenIfZeroVolume)\n  {\n       this.outputEvenIfZeroVolume = 
outputEvenIfZeroVolume;\n  }\n\n}\n\n\n\n\nThe operator has three output ports 
that emit the price of the\nstock, the volume of the stock and the last trade 
time of the stock,\ndeclared as public member variables price, volume\u00a0and  
time\u00a0of the class. \u00a0The tuple of the\nprice\u00a0output port is a 
key-value\npair with the stock symbol being the key, and the price being the 
value.\n\u00a0The tuple of the volume
 \u00a0output\nport is a key value pair with the stock symbol being the key, 
and the\nincremental volume being the value. \u00a0The tuple of the  
time\u00a0output port is a key value pair with the\nstock symbol being the key, 
and the last trade time being the\nvalue.\n\n\nImportant: Since operators will 
be\nserialized, all input and output ports need to be declared 
transient\nbecause they are stateless and should not be serialized.\n\n\nThe 
method\u00a0setup(OperatorContext)\ncontains the code that is necessary for 
setting up the HTTP\nclient for querying Yahoo! 
Finance.\n\n\nMethod\u00a0emitTuples() contains\nthe code that reads from 
Yahoo! Finance, and emits the data to the\noutput ports of the operator. 
\u00a0emitTuples()\u00a0will be called one or more times\nwithin one 
application window as long as time is allowed within the\nwindow.\n\n\nNote 
that we want to emulate the tick input stream by having\nincremental volume 
data with Yahoo! Finance data. \u00a0We therefore subtract\nt
 he previous volume from the current volume to emulate incremental\nvolume for 
each tick.\n\n\nThe operator\nDailyVolume:\u00a0This operator\nreads from the 
input port, which contains the incremental volume tuples\nfrom StockTickInput, 
and\naggregates the data to provide the cumulative volume. \u00a0It uses 
the\nlibrary class  SumKeyVal\nK,V\n\u00a0provided in math\u00a0package. 
\u00a0In this case,\nSumKeyVal\nString,Long\n, where K is the stock symbol, V 
is the\naggregated volume, with cumulative\nset to true. (Otherwise if  
cumulativewas set to false, SumKeyVal would\nprovide the sum for the 
application window.) \u00a0Malhar provides a number\nof built-in operators for 
simple operations like this so that\napplication developers do not have to 
write them. \u00a0More examples to\nfollow. This operator assumes that the 
application restarts before the\nmarket opens every day.\n\n\nThe operator 
Quote:\nThis operator has three input ports, which are price 
(from\nStockTickInput), daily_vo
 l (from\nDaily Volume), and time (from\n StockTickInput). \u00a0This 
operator\njust consolidates the three data items and and emits the 
consolidated\ndata. \u00a0It utilizes the class 
ConsolidatorKeyVal\nK\n\u00a0from the\nstream\u00a0package.\n\n\nThe operator 
HighLow:\u00a0This operator reads from the input port,\nwhich contains the 
price tuples from StockTickInput, and provides the high and the\nlow price 
within the application window. \u00a0It utilizes the library class\n 
RangeKeyVal\nK,V\n\u00a0provided\nin the math\u00a0package. In this 
case,\nRangeKeyVal\nString,Double\n.\n\n\nThe operator MinuteVolume:\nThis 
operator reads from the input port, which contains the\nvolume tuples from 
StockTickInput,\nand aggregates the data to provide the sum of the volume 
within one\nminute. \u00a0Like the operator  DailyVolume, this operator also 
uses\nSumKeyVal\nString,Long\n, but\nwith cumulative set to false. 
\u00a0The\nApplication Window is set to one minute. We will explain how to set t
 his\nlater.\n\n\nThe operator Chart:\nThis operator is very similar to the 
operator Quote, except that it takes inputs from\nHigh Low\u00a0and  Minute 
Vol\u00a0and outputs the consolidated tuples\nto the output port.\n\n\nThe 
operator PriceSMA:\nSMA stands for - Simple Moving Average. It reads from 
the\ninput port, which contains the price tuples from StockTickInput, 
and\nprovides the moving average price of the stock. \u00a0It 
utilizes\nSimpleMovingAverage\nString,Double\n, which is provided in the\n 
multiwindow\u00a0package.\nSimpleMovingAverage keeps track of the data of the 
previous N\napplication windows in a sliding manner. \u00a0For each end window 
event, it\nprovides the average of the data in those application 
windows.\n\n\nThe operator Console:\nThis operator just outputs the input 
tuples to the console\n(or stdout). \u00a0In this example, there are four 
console\u00a0operators, which connect to the output\nof  Quote, Chart, PriceSMA 
and VolumeSMA. \u00a0In\npractice, they 
 should be replaced by operators that use the data to\nproduce visualization 
artifacts like charts.\n\n\nConnecting the operators together and constructing 
the\nDAG:\u00a0Now that we know the\noperators used, we will create the DAG, 
set the streaming window size,\ninstantiate the operators, and connect the 
operators together by adding\nstreams that connect the output ports with the 
input ports among those\noperators. \u00a0This code is in the file  
YahooFinanceApplication.java. Refer to Figure 1\nagain for the graphical 
representation of the DAG. \u00a0The last method in\nthe code, namely 
getApplication(),\ndoes all that. \u00a0The rest of the methods are just for 
setting up the\noperators.\n\n\npackage 
com.datatorrent.demos.yahoofinance;\n\nimport 
com.datatorrent.api.ApplicationFactory;\nimport 
com.datatorrent.api.Context.OperatorContext;\nimport 
com.datatorrent.api.DAG;\nimport 
com.datatorrent.api.Operator.InputPort;\nimport 
com.datatorrent.lib.io.ConsoleOutputOperator;\nimport com
 .datatorrent.lib.math.RangeKeyVal;\nimport 
com.datatorrent.lib.math.SumKeyVal;\nimport 
com.datatorrent.lib.multiwindow.SimpleMovingAverage;\nimport 
com.datatorrent.lib.stream.ConsolidatorKeyVal;\nimport 
com.datatorrent.lib.util.HighLow;\nimport 
org.apache.hadoop.conf.Configuration;\n\n/**\n * Yahoo! Finance application 
demo. \np\n\n *\n * Get Yahoo finance feed and calculate minute price range, 
minute volume, simple moving average of 5 minutes.\n */\npublic class 
Application implements StreamingApplication\n{\n  private int 
streamingWindowSizeMilliSeconds = 1000; // 1 second (default is 500ms)\n  
private int appWindowCountMinute = 60;   // 1 minute\n  private int 
appWindowCountSMA = 5 * 60;  // 5 minute\n\n  /**\n   * Get actual Yahoo 
finance ticks of symbol, last price, total daily volume, and last traded 
price.\n   */\n  public StockTickInput getStockTickInputOperator(String name, 
DAG dag)\n  {\n    StockTickInput oper = dag.addOperator(name, 
StockTickInput.class);\n    oper.readI
 ntervalMillis = 200;\n    return oper;\n  }\n\n  /**\n   * This sends total 
daily volume by adding volumes from each ticks.\n   */\n  public 
SumKeyVal\nString, Long\n getDailyVolumeOperator(String name, DAG dag)\n  {\n   
 SumKeyVal\nString, Long\n oper = dag.addOperator(name, new SumKeyVal\nString, 
Long\n());\n    oper.setType(Long.class);\n    oper.setCumulative(true);\n    
return oper;\n  }\n\n  /**\n   * Get aggregated volume of 1 minute and send at 
the end window of 1 minute.\n   */\n  public SumKeyVal\nString, Long\n 
getMinuteVolumeOperator(String name, DAG dag, int appWindowCount)\n  {\n    
SumKeyVal\nString, Long\n oper = dag.addOperator(name, new SumKeyVal\nString, 
Long\n());\n    oper.setType(Long.class);\n    
oper.setEmitOnlyWhenChanged(true);\ndag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);\n
    return oper;\n  }\n\n  /**\n   * Get High-low range for 1 minute.\n   */\n  
public RangeKeyVal\nString, Double\n getHighLow
 Operator(String name, DAG dag, int appWindowCount)\n  {\n    
RangeKeyVal\nString, Double\n oper = dag.addOperator(name, new 
RangeKeyVal\nString, Double\n());\n    
dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);\n
    oper.setType(Double.class);\n    return oper;\n  }\n\n  /**\n   * Quote 
(Merge price, daily volume, time)\n   */\n  public 
ConsolidatorKeyVal\nString,Double,Long,String,?,?\n getQuoteOperator(String 
name, DAG dag)\n  {\n    ConsolidatorKeyVal\nString,Double,Long,String,?,?\n 
oper = dag.addOperator(name, new 
ConsolidatorKeyVal\nString,Double,Long,String,Object,Object\n());\n    return 
oper;\n  }\n\n  /**\n   * Chart (Merge minute volume and minute high-low)\n   
*/\n  public ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n 
getChartOperator(String name, DAG dag)\n  {\n    
ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n oper = dag.addOperator(name, 
new ConsolidatorKeyVal\nString,HighLow,Long,Object,Object,Object\n());\n 
    return oper;\n  }\n\n  /**\n   * Get simple moving average of price.\n   
*/\n  public SimpleMovingAverage\nString, Double\n 
getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount)\n 
 {\n    SimpleMovingAverage\nString, Double\n oper = dag.addOperator(name, new 
SimpleMovingAverage\nString, Double\n());\n    
oper.setWindowSize(appWindowCount);\n    oper.setType(Double.class);\n    
return oper;\n  }\n\n  /**\n   * Get console for output.\n   */\n  public 
InputPort\nObject\n getConsole(String name, /*String nodeName,*/ DAG dag, 
String prefix)\n  {\n    ConsoleOutputOperator oper = dag.addOperator(name, 
ConsoleOutputOperator.class);\n    oper.setStringFormat(prefix + \n: %s\n);\n   
 return oper.input;\n  }\n\n  /**\n   * Create Yahoo Finance Application DAG.\n 
  */\n  @Override\n  public void populateDAG(DAG dag, Configuration conf)\n  
{\n    
dag.getAttributes().put(DAG.STRAM_WINDOW_SIZE_MILLIS,streamingWindowSizeMilliSeconds);\n\n
    StockTickInput tick = getSto
 ckTickInputOperator(\nStockTickInput\n, dag);\n    SumKeyVal\nString, Long\n 
dailyVolume = getDailyVolumeOperator(\nDailyVolume\n, dag);\n    
ConsolidatorKeyVal\nString,Double,Long,String,?,?\n quoteOperator = 
getQuoteOperator(\nQuote\n, dag);\n\n    RangeKeyVal\nString, Double\n highlow 
= getHighLowOperator(\nHighLow\n, dag, appWindowCountMinute);\n    
SumKeyVal\nString, Long\n minuteVolume = 
getMinuteVolumeOperator(\nMinuteVolume\n, dag, appWindowCountMinute);\n    
ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n chartOperator = 
getChartOperator(\nChart\n, dag);\n\n    SimpleMovingAverage\nString, Double\n 
priceSMA = getPriceSimpleMovingAverageOperator(\nPriceSMA\n, dag, 
appWindowCountSMA);\n       DefaultPartitionCodec\nString, Double\n codec = new 
DefaultPartitionCodec\nString, Double\n();\n    
dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec);\n    
dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec);\n    
dag.addStream(\nprice\n, tick
 .price, quoteOperator.in1, highlow.data, priceSMA.data);\n    
dag.addStream(\nvol\n, tick.volume, dailyVolume.data, minuteVolume.data);\n    
dag.addStream(\ntime\n, tick.time, quoteOperator.in3);\n    
dag.addStream(\ndaily_vol\n, dailyVolume.sum, quoteOperator.in2);\n\n    
dag.addStream(\nquote_data\n, quoteOperator.out, getConsole(\nquoteConsole\n, 
dag, \nQUOTE\n));\n\n    dag.addStream(\nhigh_low\n, highlow.range, 
chartOperator.in1);\n    dag.addStream(\nvol_1min\n, minuteVolume.sum, 
chartOperator.in2);\n    dag.addStream(\nchart_data\n, chartOperator.out, 
getConsole(\nchartConsole\n, dag, \nCHART\n));\n\n    
dag.addStream(\nsma_price\n, priceSMA.doubleSMA, 
getConsole(\npriceSMAConsole\n, dag, \nPrice SMA\n));\n\n    return dag;\n  
}\n\n}\n\n\n\n\nNote that we also set a user-specific sliding window for SMA 
that\nkeeps track of the previous N data points. \u00a0Do not confuse this with 
the\nattribute APPLICATION_WINDOW_COUNT.\n\n\nIn the rest of this chapter we 
will run through th
 e process of\nrunning this application. We assume that \u00a0you are familiar 
with details\nof your Hadoop infrastructure. For installation\ndetails please 
refer to the \nInstallation Guide\n.\n\n\nRunning a Test Application\n\n\nWe 
will now describe how to run the yahoo\nfinance application\u00a0described 
above in different modes\n(local mode, single node on Hadoop, and multi-nodes 
on Hadoop).\n\n\nThe platform runs streaming applications under the control of 
a\nlight-weight Streaming Application Manager (STRAM). Each application 
has\nits own instance of STRAM. STRAM launches the application and\ncontinually 
provides run time monitoring, analysis, and takes action\nsuch as load scaling 
or outage recovery as needed. \u00a0We will discuss\nSTRAM in more detail in 
the next chapter.\n\n\nThe instructions below assume that the platform was 
installed in a\ndirectory \nINSTALL_DIR\n and the command line interface (CLI) 
will\nbe used to launch the demo application. An application can be ru
 n in\nlocal mode\u00a0(in IDE or from command line) or on a Hadoop 
cluster.\n\n\nTo start the dtCli run\n\n\nINSTALL_DIR\n/bin/dtcli\n\n\n\nThe 
command line prompt appears.  To start the application in local mode (the 
actual version number in the file name may differ)\n\n\ndt\n launch -local 
\nINSTALL_DIR\n/yahoo-finance-demo-3.2.0-SNAPSHOT.apa\n\n\n\nTo terminate the 
application in local mode, enter Ctrl-C\n\n\nTu run the application on the 
Hadoop cluster (the actual version\nnumber in the file name may 
differ)\n\n\ndt\n launch 
\nINSTALL_DIR\n/yahoo-finance-demo-3.2.0-SNAPSHOT.apa\n\n\n\nTo stop the 
application running in Hadoop, terminate it in the dtCli:\n\n\ndt\n 
kill-app\n\n\n\nExecuting the application in either mode includes the 
following\nsteps. At a top level, STRAM (Streaming Application Manager) 
validates\nthe application (DAG), translates the logical plan to the physical 
plan\nand then launches the execution engine. The mode determines 
the\nresources needed and how how t
 hey are used.\n\n\nLocal Mode\n\n\nIn local mode, the application is run as a 
single-process\u00a0with multiple threads. Although a\nfew Hadoop classes are 
needed, there is no dependency on a Hadoop\ncluster or Hadoop services. The 
local file system is used in place of\nHDFS. This mode allows a quick run of an 
application in a single process\nsandbox, and hence is the most suitable to 
debug and analyze the\napplication logic. This mode is recommended for 
developing the\napplication and can be used for running applications within the 
IDE for\nfunctional testing purposes. Due to limited resources and lack 
\u00a0of\nscalability an application running in this single process mode is 
more\nlikely to encounter throughput bottlenecks. A distributed cluster 
is\nrecommended for benchmarking and production testing.\n\n\nHadoop 
Cluster\n\n\nIn this section we discuss various Hadoop cluster 
setups.\n\n\nSingle Node Cluster\n\n\nIn a single node Hadoop cluster all 
services are deployed on a\nsing
 le server (a developer can use his/her development machine as a\nsingle node 
cluster). The platform does not distinguish between a single\nor multi-node 
setup and behaves exactly the same in both cases.\n\n\nIn this mode, the 
resource manager, name node, data node, and node\nmanager occupy one process 
each. This is an example of running a\nstreaming application as a 
multi-process\u00a0application on the same server.\nWith prevalence of fast, 
multi-core systems, this mode is effective for\ndebugging, fine tuning, and 
generic analysis before submitting the job\nto a larger Hadoop cluster. In this 
mode, execution uses the Hadoop\nservices and hence is likely to identify 
issues that are related to the\nHadoop environment (such issues will not be 
uncovered in local mode).\nThe throughput will obviously not be as high as on a 
multi-node Hadoop\ncluster. Additionally, since each container (i.e. Java 
process) requires\na significant amount of memory, you will be able to run a 
much smaller\n
 number of containers than on a multi-node cluster.\n\n\nMulti-Node 
Cluster\n\n\nIn a multi-node Hadoop cluster all the services of Hadoop 
are\ntypically distributed across multiple nodes in a production 
or\nproduction-level test environment. Upon launch the application 
is\nsubmitted to the Hadoop cluster and executes as a  multi-processapplication 
on\u00a0multiple nodes.\n\n\nBefore you start deploying, testing and 
troubleshooting your\napplication on a cluster, you should ensure that Hadoop 
(version 2.2.0\nor later)\u00a0is properly installed and\nyou have basic skills 
for working with it.\n\n\n\n\nApache Apex Platform Overview\n\n\nStreaming 
Computational Model\n\n\nIn this chapter, we describe the the basics of the 
real-time streaming platform and its computational model.\n\n\nThe platform is 
designed to enable completely asynchronous real time computations\u00a0done in 
as unblocked a way as possible with\nminimal overhead .\n\n\nApplications 
running in the platform are represent
 ed by a Directed\nAcyclic Graph (DAG) made up of \u00a0operators and streams. 
All computations\nare done in memory on arrival of\nthe input data, with an 
option to save the output to disk (HDFS) in a\nnon-blocking way. The data that 
flows between operators consists of\natomic data elements. Each data element 
along with its type definition\n(henceforth called  schema) is\ncalled a 
tuple.\u00a0An application is a\ndesign of the flow of these tuples to and 
from\nthe appropriate compute units to enable the computation of the 
final\ndesired results.\u00a0A message queue (henceforth called\n\u00a0buffer 
server) manages tuples streaming\nbetween compute units in different 
processes.This server keeps track of\nall consumers, publishers, partitions, 
and enables replay. More\ninformation is given in later section.\n\n\nThe 
streaming application is monitored by a decision making entity\ncalled STRAM 
(streaming application\nmanager).\u00a0STRAM is designed to be a light 
weight\ncontroller that 
 has minimal but sufficient interaction with the\napplication. This is done via 
periodic heartbeats. The\nSTRAM does the initial launch and periodically 
analyzes the system\nmetrics to decide if any run time action needs to be 
taken.\n\n\nA fundamental building block for the streaming platform\nis the 
concept of breaking up a stream into equal finite time slices\ncalled streaming 
windows. Each window contains the ordered\nset of tuples in that time slice. A 
typical duration of a window is 500\nms, but can be configured per application 
(the Yahoo! Finance\napplication configures this value in the  
properties.xml\u00a0file to be 1000ms = 1s). Each\nwindow is preceded by a 
begin_window\u00a0event and is terminated by an\nend_window\u00a0event, and is 
assigned\na unique window ID. Even though the platform performs computations 
at\nthe tuple level, bookkeeping is done at the window boundary, making 
the\ncomputations within a window an atomic event in the platform. \u00a0We 
can\nthink of e
 ach window as an  atomic\nmicro-batch\u00a0of tuples, to be processed together 
as one\natomic operation (See Figure 2). \u00a0\n\n\nThis atomic batching 
allows the platform to avoid the very steep\nper tuple bookkeeping cost and 
instead has a manageable per batch\nbookkeeping cost. This translates to higher 
throughput, low recovery\ntime, and higher scalability. Later in this document 
we illustrate how\nthe atomic micro-batch concept allows more efficient 
optimization\nalgorithms.\n\n\nThe platform also has in-built support 
for\napplication windows.\u00a0 An application window is part of 
the\napplication specification, and can be a small or large multiple of 
the\nstreaming window. \u00a0An example from our Yahoo! Finance test 
application\nis the moving average, calculated over a sliding application 
window of 5\nminutes which equates to 300 (= 5 * 60) streaming 
windows.\n\n\nNote that these two window concepts are distinct. \u00a0A 
streaming\nwindow is an abstraction of many tuples i
 nto a higher atomic event for\neasier management. \u00a0An application window 
is a group of consecutive\nstreaming windows used for data aggregation (e.g. 
sum, average, maximum,\nminimum) on a per operator level.\n\n\n\n\nAlongside 
the platform,\u00a0a set of\npredefined, benchmarked standard library operator 
templates is provided\nfor ease of use and rapid development of 
application.\u00a0These\noperators are open sourced to Apache Software 
Foundation under the\nproject name \u201cMalhar\u201d as part of our efforts to 
foster community\ninnovation. These operators can be used in a DAG as is, while 
others\nhave properties\u00a0that can be set to specify the\ndesired 
computation. Those interested in details, should refer to\n\nApex-Malhar 
operator library\n.\n\n\nThe platform is a Hadoop YARN native\napplication. It 
runs in a Hadoop cluster just like any\nother YARN application (MapReduce etc.) 
and is designed to seamlessly\nintegrate with rest of Hadoop technology stack. 
It leverage
 s Hadoop as\nmuch as possible and relies on it as its distributed operating 
system.\nHadoop dependencies include resource management, 
compute/memory/network\nallocation, HDFS, security, fault tolerance, 
monitoring, metrics,\nmulti-tenancy, logging etc. Hadoop classes/concepts are 
reused as much\nas possible.  The aim is to enable enterprises\nto leverage 
their existing Hadoop infrastructure for real time streaming\napplications. The 
platform is designed to scale with big\ndata applications and scale with 
Hadoop.\n\n\nA streaming application is an asynchronous execution 
of\ncomputations across distributed nodes. All computations are done 
in\nparallel on a distributed cluster. The computation model is designed to\ndo 
as many parallel computations as possible in a non-blocking fashion.\nThe task 
of monitoring of the entire application is done on (streaming)\nwindow 
boundaries with a streaming window as an atomic entity. A window\ncompletion is 
a quantum of work done. There is no assump
 tion that an\noperator can be interrupted at precisely a particular tuple or 
window.\n\n\nAn operator itself also\ncannot assume or predict the exact time a 
tuple that it emitted would\nget consumed by downstream operators. The operator 
processes the tuples\nit gets and simply emits new tuples based on its business 
logic. The\nonly guarantee it has is that the upstream operators are 
processing\neither the current or some later window, and the downstream 
operator is\nprocessing either the current or some earlier window. The 
completion of\na window (i.e. propagation of the  end_window\u00a0event through 
an operator) in any\noperator guarantees that all upstream operators have 
finished processing\nthis window. Thus, the end_window\u00a0event is blocking 
on an operator\nwith multiple outputs, and is a synchronization point in the 
DAG. The\n begin_window\u00a0event does not have\nany such restriction, a 
single begin_window\u00a0event from any upstream operator\ntriggers the 
operator to s
 tart processing tuples.\n\n\nStreaming Application Manager 
(STRAM)\n\n\nStreaming Application Manager (STRAM) is the Hadoop YARN 
native\napplication master. STRAM is the first process that is activated 
upon\napplication launch and orchestrates the streaming application on 
the\nplatform. STRAM is a lightweight controller process. The\nresponsibilities 
of STRAM include\n\n\n\n\n\n\nRunning the Application\n\n\n\n\nRead 
the\u00a0logical plan\u00a0of the application (DAG) submitted by the 
client\n\n\nValidate the logical plan\n\n\nTranslate the logical plan into a 
physical plan, where certain operators may  be partitioned (i.e. replicated) to 
multiple operators for  handling load.\n\n\nRequest resources (Hadoop 
containers) from Resource Manager,\n    per physical plan\n\n\nBased on 
acquired resources and application attributes, create\n    an execution 
plan\u00a0by partitioning the DAG into fragments,\n    each assigned to 
different containers.\n\n\nExecutes the application by deploying
  each fragment to\n    its container. Containers then start stream processing 
and run\n    autonomously, processing one streaming window after another. 
Each\n    container is represented as an instance of the  
StreamingContainer\u00a0class, which updates\n    STRAM via the heartbeat 
protocol and processes directions received\n    from 
STRAM.\n\n\n\n\n\n\n\n\nContinually monitoring the application via heartbeats 
from each StreamingContainer\n\n\n\n\nCollecting Application System Statistics 
and Logs\n\n\nLogging all application-wide decisions taken\n\n\nProviding 
system data on the state of the application via a  Web 
Service.\n\n\n\n\nSupporting Fault Tolerance\n\n\na.  Detecting a node 
outage\nb.  Requesting a replacement resource from the Resource Manager\n    
and scheduling state restoration for the streaming operators\nc.  Saving state 
to Zookeeper\n\n\n\n\n\n\nSupporting Dynamic Partitioning:\u00a0Periodically 
evaluating the SLA and modifying the physical plan if required\n    (l
 ogical plan does not change).\n\n\n\n\nEnabling Security:\u00a0Distributing 
security tokens for distributed components of the execution engine\n    and 
securing web service requests.\n\n\nEnabling Dynamic modification of DAG: In 
the future, we intend to allow for user initiated\n    modification of the 
logical plan to allow for changes to the\n    processing logic and 
functionality.\n\n\n\n\nAn example of the Yahoo! Finance Quote application 
scheduled on a\ncluster of 5 Hadoop containers (processes) is shown in Figure 
3.\n\n\n\n\nAn example for the translation from a logical plan to a 
physical\nplan and an execution plan for a subset of the application is shown 
in\nFigure 4.\n\n\n\n\nHadoop Components\n\n\nIn this section we cover some 
aspects of Hadoop that your\nstreaming application interacts with. This section 
is not meant to\neducate the reader on Hadoop, but just get the reader 
acquainted with\nthe terms. We strongly advise readers to learn Hadoop from 
other\nsources.\n\n\nA s
 treaming application runs as a native Hadoop 2.2 application.\nHadoop 2.2 does 
not differentiate between a map-reduce job and other\napplications, and hence 
as far as Hadoop is concerned, the streaming\napplication is just another job. 
This means that your application\nleverages all the bells and whistles Hadoop 
provides and is fully\nsupported within Hadoop technology stack. The platform 
is responsible\nfor properly integrating itself with the relevant components of 
Hadoop\nthat exist today and those that may emerge in the future\n\n\nAll 
investments that leverage multi-tenancy (for example quotas\nand queues), 
security (for example kerberos), data flow integration (for\nexample copying 
data in-out of HDFS), monitoring, metrics collections,\netc. will require no 
changes when streaming applications run 
on\nHadoop.\n\n\nYARN\n\n\nYARN\nis\nthe core library of Hadoop 2.2 that is 
tasked with resource management\nand works as a distributed application 
framework. In this section we\nwill
  walk through Yarn's components. In Hadoop 2.2, the old jobTracker\nhas been 
replaced by a combination of ResourceManager (RM) and\nApplicationMaster 
(AM).\n\n\nResource Manager (RM)\n\n\nResourceManager\n(RM)\nmanages all the 
distributed resources. It allocates and arbitrates all\nthe slots and the 
resources (cpu, memory, network) of these slots. It\nworks with per-node 
NodeManagers (NMs) and per-application\nApplicationMasters (AMs). Currently 
memory usage is monitored by RM; in\nupcoming releases it will have CPU as well 
as network management. RM is\nshared by map-reduce and streaming applications. 
Running streaming\napplications requires no changes in the RM.\n\n\nApplication 
Master (AM)\n\n\nThe AM is the watchdog or monitoring process for your 
application\nand has the responsibility of negotiating resources with RM 
and\ninteracting with NodeManagers to get the allocated containers 
started.\nThe AM is the starting point of your application and is considered 
user\ncode (not syst
 em Hadoop code). The AM itself runs in one container. All\nresource management 
within the application are managed by the AM. This\nis a critical feature for 
Hadoop 2.2 where tasks done by jobTracker in\nHadoop 1.0 have been distributed 
allowing Hadoop 2.2 to scale much\nbeyond Hadoop 1.0. STRAM is a native YARN 
ApplicationManager.\n\n\nNode Managers (NM)\n\n\nThere is one 
\nNodeManager\n(NM)\nper node in the cluster. All the containers (i.e. 
processes) on that\nnode are monitored by the NM. It takes instructions from RM 
and manages\nresources of that node as per RM instructions. NMs interactions 
are same\nfor map-reduce and for streaming applications. Running 
streaming\napplications requires no changes in the NM.\n\n\nRPC 
Protocol\n\n\nCommunication among RM, AM, and NM is done via the Hadoop 
RPC\nprotocol. Streaming applications use the same protocol to send 
their\ndata. No changes are needed in RPC support provided by Hadoop to 
enable\ncommunication done by components of your appl
 ication.\n\n\nHDFS\n\n\nHadoop includes a highly fault tolerant, high 
throughput\ndistributed file system (\nHDFS\n).\nIt runs on commodity hardware, 
and your streaming application will, by\ndefault, use it. There is no 
difference between files created by a\nstreaming application and those created 
by map-reduce.\n\n\nDeveloping An Application\n\n\nIn this chapter we describe 
the methodology to develop an\napplication using the Realtime Streaming 
Platform. The platform was\ndesigned to make it easy to build and launch 
sophisticated streaming\napplications with the developer having to deal only 
with the\napplication/business logic. The platform deals with details of where 
to\nrun what operators on which servers and how to correctly route streams\nof 
data among them.\n\n\nDevelopment Process\n\n\nWhile the platform does not 
mandate a specific methodology or set\nof development tools, we have 
recommendations to maximize productivity\nfor the different phases of 
application development.\
 n\n\nDesign\n\n\n\n\nIdentify common, reusable operators. Use a library\n    
if possible.\n\n\nIdentify scalability and performance requirements before\n    
designing the DAG.\n\n\nLeverage attributes that the platform supports for 
scalability\n    and performance.\n\n\nUse operators that are benchmarked and 
tested so that later\n    surprises are minimized. If you have glue code, 
create appropriate\n    unit tests for it.\n\n\nUse THREAD_LOCAL locality for 
high throughput streams. If all\n    the operators on that stream cannot fit in 
one container,\n    try\u00a0NODE_LOCAL\u00a0locality. Both THREAD_LOCAL and\n  
  NODE_LOCAL streams avoid the Network Interface Card (NIC)\n    completly. The 
former uses intra-process communication to also avoid\n    
serialization-deserialization overhead.\n\n\nThe overall throughput and 
latencies are are not necessarily\n    correlated to the number of operators in 
a simple way -- the\n    relationship is more nuanced. A lot depends on how 
much wor
 k\n    individual operators are doing, how many are able to operate in\n    
parallel, and how much data is flowing through the arcs of the DAG.\n    It is, 
at times, better to break a computation down into its\n    constituent simple 
parts and then stitch them together via streams\n    to better utilize the 
compute resources of the cluster. Decide on a\n    per application basis the 
fine line between complexity of each\n    operator vs too many streams. Doing 
multiple computations in one\n    operator does save network I/O, while 
operators that are too complex\n    are hard to maintain.\n\n\nDo not use 
operators that depend on the order of two streams\n    as far as possible. In 
such cases behavior is not idempotent.\n\n\nPersist key information to HDFS if 
possible; it may be useful\n    for debugging later.\n\n\nDecide on an 
appropriate fault tolerance mechanism. If some\n    data loss is acceptable, 
use the at-most-once mechanism as it has\n    fastest 
recovery.\n\n\n\n\nCreating 
 New Project\n\n\nPlease refer to the \nApex Application 
Packages\n\u00a0for\nthe basic steps for creating a new project.\n\n\nWriting 
the application code\n\n\nPreferably use an IDE (Eclipse, Netbeans etc.) that 
allows you to\nmanage dependencies and assists with the Java coding. Specific 
benefits\ninclude ease of managing operator library jar files, individual 
operator\nclasses, ports and properties. It will also highlight and assist 
to\nrectify issues such as type mismatches when adding streams 
while\ntyping.\n\n\nTesting\n\n\nWrite test cases with JUnit or similar test 
framework so that code\nis tested as it is written. For such testing, the DAG 
can run in local\nmode within the IDE. Doing this may involve writing mock 
input or output\noperators for the integration points with external systems. 
For example,\ninstead of reading from a live data stream, the application in 
test mode\ncan read from and write to files. This can be done with a 
single\napplication DAG by instrumenting a
  test mode using settings in the\nconfiguration that is passed to the 
application factory\ninterface.\n\n\nGood test coverage will not only eliminate 
basic validation errors\nsuch as missing port connections or property 
constraint violations, but\nalso validate the correct processing of the data. 
The same tests can be\nre-run whenever the application or its dependencies 
change (operator\nlibraries, version of the platform etc.)\n\n\nRunning an 
application\n\n\nThe platform provides a commandline tool called dtcli\u00a0for 
managing applications (launching,\nkilling, viewing, etc.). This tool was 
already discussed above briefly\nin the section entitled Running the Test 
Application. It will introspect\nthe jar file specified with the launch command 
for applications (classes\nthat implement ApplicationFactory) or property files 
that define\napplications. It will also deploy the dependency jar files from 
the\napplication package to the cluster.\n\n\nDtcli can run the application in 
local
  mode (i.e. outside a\ncluster). It is recommended to first run the 
application in local mode\nin the development environment before launching on 
the Hadoop cluster.\nThis way some of the external system integration and 
correct\nfunctionality of the application can be verified in an easier to 
debug\nenvironment before testing distributed mode.\n\n\nFor more details on 
CLI please refer to the \ndtCli Guide\n.\n\n\nApplication API\n\n\nThis section 
introduces the API to write a streaming application.\nThe work involves 
connecting operators via streams to form the logical\nDAG. The steps 
are\n\n\n\n\n\n\nInstantiate an application (DAG)\n\n\n\n\n\n\n(Optional) Set 
Attributes\n\n\n\n\nAssign application name\n\n\nSet any other attributes as 
per application requirements\n\n\n\n\n\n\n\n\nCreate/re-use and instantiate 
operators\n\n\n\n\nAssign operator name that is unique within the  
application\n\n\nDeclare schema upfront for each operator (and thereby its 
ports)\n\n\n(Optional) Set prope
 rties\u00a0 and attributes on the dag as per specification\n\n\nConnect ports 
of operators via streams\n\n\nEach stream connects one output port of an 
operator to one or  more input ports of other operators.\n\n\n(Optional) Set 
attributes on the streams\n\n\n\n\n\n\n\n\n\n\n\n\nTest the 
application.\n\n\n\n\n\n\nThere are two methods to create an application, 
namely Java, and\nProperties file. Java API is for applications being developed 
by humans,\nand properties file (Hadoop like) is more suited for DAGs generated 
by\ntools.\n\n\nJava API\n\n\nThe Java API is the most common way to create a 
streaming\napplication. It is meant for application developers who prefer 
to\nleverage the features of Java, and the ease of use and 
enhanced\nproductivity provided by IDEs like NetBeans or Eclipse. Using Java 
to\nspecify the application provides extra validation abilities of 
Java\ncompiler, such as compile time checks for type safety at the time 
of\nwriting the code. Later in this chapter you 
 can read more about\nvalidation support in the platform.\n\n\nThe developer 
specifies the streaming application by implementing\nthe ApplicationFactory 
interface, which is how platform tools (CLI etc.)\nrecognize and instantiate 
applications. Here we show how to create a\nYahoo! Finance application that 
streams the last trade price of a ticker\nand computes the high and low price 
in every 1 min window. Run above\n test application\u00a0to execute the\nDAG in 
local mode within the IDE.\n\n\nLet us revisit how the Yahoo! Finance test 
application constructs the DAG:\n\n\npublic class Application implements 
StreamingApplication\n{\n\n  ...\n\n  @Override\n  public void populateDAG(DAG 
dag, Configuration conf)\n  {\n    
dag.getAttributes().attr(DAG.STRAM_WINDOW_SIZE_MILLIS).set(streamingWindowSizeMilliSeconds);\n\n
    StockTickInput tick = getStockTickInputOperator(\nStockTickInput\n, dag);\n 
   SumKeyVal\nString, Long\n dailyVolume = 
getDailyVolumeOperator(\nDailyVolume\n, dag);\n    Co
 nsolidatorKeyVal\nString,Double,Long,String,?,?\n quoteOperator = 
getQuoteOperator(\nQuote\n, dag);\n\n    RangeKeyVal\nString, Double\n highlow 
= getHighLowOperator(\nHighLow\n, dag, appWindowCountMinute);\n    
SumKeyVal\nString, Long\n minuteVolume = 
getMinuteVolumeOperator(\nMinuteVolume\n, dag, appWindowCountMinute);\n    
ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n chartOperator = 
getChartOperator(\nChart\n, dag);\n\n    SimpleMovingAverage\nString, Double\n 
priceSMA = getPriceSimpleMovingAverageOperator(\nPriceSMA\n, dag, 
appWindowCountSMA);\n\n    dag.addStream(\nprice\n, tick.price, 
quoteOperator.in1, highlow.data, priceSMA.data);\n    dag.addStream(\nvol\n, 
tick.volume, dailyVolume.data, minuteVolume.data);\n    dag.addStream(\ntime\n, 
tick.time, quoteOperator.in3);\n    dag.addStream(\ndaily_vol\n, 
dailyVolume.sum, quoteOperator.in2);\n\n    dag.addStream(\nquote_data\n, 
quoteOperator.out, getConsole(\nquoteConsole\n, dag, \nQUOTE\n));\n\n    
dag.addStream(\nhigh_low\n,
  highlow.range, chartOperator.in1);\n    dag.addStream(\nvol_1min\n, 
minuteVolume.sum, chartOperator.in2);\n    dag.addStream(\nchart_data\n, 
chartOperator.out, getConsole(\nchartConsole\n, dag, \nCHART\n));\n\n    
dag.addStream(\nsma_price\n, priceSMA.doubleSMA, 
getConsole(\npriceSMAConsole\n, dag, \nPrice SMA\n));\n\n    return dag;\n  
}\n}\n\n\n\n\nProperty File API\n\n\nThe platform also supports specification 
of a DAG via a property\nfile. The aim here to make it easy for tools to create 
and run an\napplication. This method of specification does not have the 
Java\ncompiler support of compile time check, but since these 
applications\nwould be created by software, they should be correct by 
construction.\nThe syntax is derived from Hadoop properties and should be easy 
for\nfolks who are used to creating software that integrated 
with\nHadoop.\n\n\nCreate an application (DAG): myApplication.properties\n\n\n# 
input operator that reads from a file\ndt.operator.inputOp.classname=com.ac
 me.SampleInputOperator\ndt.operator.inputOp.fileName=somefile.txt\n\n# output 
operator that writes to the 
console\ndt.operator.outputOp.classname=com.acme.ConsoleOutputOperator\n\n# 
stream connecting both 
operators\ndt.stream.inputStream.source=inputOp.outputPort\ndt.stream.inputStream.sinks=outputOp.inputPort\n\n\n\n\nAbove
 snippet is intended to convey the basic idea of specifying\nthe DAG without 
using Java. Operators would come from a predefined\nlibrary and referenced in 
the specification by class name and port names\n(obtained from the library 
providers documentation or runtime\nintrospection by tools). For those 
interested in details, see later\nsections and refer to the  Operation 
and\nInstallation Guide\u00a0mentioned above.\n\n\nAttributes\n\n\nAttributes 
impact the runtime behavior of the application. They do\nnot impact the 
functionality. An example of an attribute is application\nname. Setting it 
changes the application name. Another example is\nstreaming window size. S
 etting it changes the streaming window size from\nthe default value to the 
specified value. Users cannot add new\nattributes, they can only choose from 
the ones that come packaged and\npre-supported by the platform. Details of 
attributes are covered in the\n Operation and 
Installation\nGuide.\n\n\nOperators\n\n\nOperators\u00a0are basic compute 
units.\nOperators process each incoming tuple and emit zero or more tuples 
on\noutput ports as per the business logic. The data flow, connectivity,\nfault 
tolerance (node outage), etc. is taken care of by the platform. As\nan operator 
developer, all that is needed is to figure out what to do\nwith the incoming 
tuple and when (and which output port) to send out a\nparticular output tuple. 
Correctly designed operators will most likely\nget reused. Operator design 
needs care and foresight. For details, refer\nto the  \nOperator Developer 
Guide\n. As an application developer you need to connect operators\nin a way 
that it implements your business
  logic. You may also require\noperator customization for functionality and use 
attributes for\nperformance/scalability etc.\n\n\nAll operators process tuples 
asynchronously in a distributed\ncluster. An operator cannot assume or predict 
the exact time a tuple\nthat it emitted will get consumed by a downstream 
operator. An operator\nalso cannot predict the exact time when a tuple arrives 
from an upstream\noperator. The only guarantee is that the upstream operators 
are\nprocessing the current or a future window, i.e. the windowId of 
upstream\noperator is equals or exceeds its own windowId. Conversely the 
windowId\nof a downstream operator is less than or equals its own windowId. 
The\nend of a window operation, i.e. the API call to endWindow on an 
operator\nrequires that all upstream operators have finished processing 
this\nwindow. This means that completion of processing a window propagates 
in\na blocking fashion through an operator. Later sections provides 
more\ndetails on streams an
 d data flow of tuples.\n\n\nEach operator has a unique name within the DAG as 
provided by the\nuser. This is the name of the operator in the logical plan. 
The name of\nthe operator in the physical plan is an integer assigned to it by 
STRAM.\nThese integers are use the sequence from 1 to N, where N is total 
number\nof physically unique operators in the DAG. \u00a0Following the same 
rule,\neach partitioned instance of a logical operator has its own integer 
as\nan id. This id along with the Hadoop container name uniquely 
identifies\nthe operator in the execution plan of the DAG. The logical names 
and the\nphysical names are required for web service support. Operators can 
be\naccessed via both names. These same names are used while interacting\nwith  
dtcli\u00a0to access an operator.\nIdeally these names should be 
self-descriptive. For example in Figure 1,\nthe node named \u201cDaily 
volume\u201d has a physical identifier of 2.\n\n\nOperator 
Interface\n\n\nOperator interface in a DAG co
 nsists of ports,\u00a0properties,\u00a0and attributes.\nOperators interact 
with other components of the DAG via ports. Functional behavior of the 
operators\ncan be customized via parameters. Run time performance and 
physical\ninstantiation is controlled by attributes. Ports and parameters 
are\nfields (variables) of the Operator class/object, while attributes 
are\nmeta information that is attached to the operator object via 
an\nAttributeMap. An operator must have at least one port. Properties 
are\noptional. Attributes are provided by the platform and always have 
a\ndefault value that enables normal functioning of 
operators.\n\n\nPorts\n\n\nPorts are connection points by which an operator 
receives and\nemits tuples. These should be transient objects instantiated in 
the\noperator object, that implement particular interfaces. Ports should 
be\ntransient as they contain no state. They have a pre-defined schema and\ncan 
only be connected to other ports with the same schema. An input port\n
 needs to implement the interface  Operator.InputPort\u00a0and\ninterface Sink. 
A default\nimplementation of these is provided by the abstract class 
DefaultInputPort. An output port needs to\nimplement the interface  
Operator.OutputPort. A default implementation\nof this is provided by the 
concrete class DefaultOutputPort. These two are a quick way to\nimplement the 
above interfaces, but operator developers have the option\nof providing their 
own implementations.\n\n\nHere are examples of an input and an output port from 
the operator\nSum.\n\n\n@InputPortFieldAnnotation(name = \ndata\n)\npublic 
final transient DefaultInputPort\nV\n data = new DefaultInputPort\nV\n() {\n  
@Override\n  public void process(V tuple)\n  {\n    ...\n  
}\n}\n@OutputPortFieldAnnotation(optional=true)\npublic final transient 
DefaultOutputPort\nV\n sum = new DefaultOutputPort\nV\n(){ \u2026 
};\n\n\n\n\nThe process call is in the Sink interface. An emit on an 
output\nport is done via emit(tuple) call. For the a
 bove example it would be\nsum.emit(t), where the type of t is the generic 
parameter V.\n\n\nThere is no limit on how many ports an operator can have. 
However\nany operator must have at least one port. An operator with only one 
port\nis called an Input Adapter if it has no input port and an Output 
Adapter\nif it has no output port. These are special operators needed to 
get/read\ndata from outside system/source into the application, or push/write 
data\ninto an outside system/sink. These could be in Hadoop or outside 
of\nHadoop. These two operators are in essence gateways for the 
streaming\napplication to communicate with systems outside the 
application.\n\n\nPort connectivity can be validated during compile time by 
adding\nPortFieldAnnotations shown above. By default all ports have to 
be\nconnected, to allow a port to go unconnected, you need to 
add\n\u201coptional=true\u201d to the annotation.\n\n\nAttributes can be 
specified for ports that affect the runtime\nbehavior. An example of
  an attribute is parallel partition that specifes\na parallel computation flow 
per partition. It is described in detail in\nthe Parallel Partitions section. 
Another example is queue capacity that specifies the buffer size for the\nport. 
Details of attributes are covered in  Operation and Installation 
Guide.\n\n\nProperties\n\n\nProperties are the abstractions by which functional 
behavior of an\noperator can be customized. They should be non-transient 
objects\ninstantiated in the operator object. They need to be non-transient 
since\nthey are part of the operator state and re-construction of the 
operator\nobject from its checkpointed state must restore the operator to 
the\ndesired state. Properties are optional, i.e. an operator may or may 
not\nhave properties; they are part of user code and their values are 
not\ninterpreted by the platform in any way.\n\n\nAll non-serializable objects 
should be declared transient.\nExamples include sockets, session information, 
etc. These objects sho
 uld\nbe initialized during setup call, which is called every time 
the\noperator is initialized.\n\n\nAttributes\n\n\nAttributes are values 
assigned to the operators that impact\nrun-time. This includes things like the 
number of partitions, at most\nonce or at least once or exactly once recovery 
modes, etc. Attributes do\nnot impact functionality of the operator. Users can 
change certain\nattributes in runtime. Users cannot add attributes to 
operators; they\nare pre-defined by the platform. They are interpreted by the 
platform\nand thus cannot be defined in user created code (like 
properties).\nDetails of attributes are covered in  \nConfiguration 
Guide\n.\n\n\nOperator State\n\n\nThe state of an operator is defined as the 
data that it transfers\nfrom one window to a future window. Since the computing 
model of the\nplatform is to treat windows like micro-batches, the operator 
state can\nbe checkpointed every Nth window, or every T units of time, where T 
is significantly greater\nthan
  the streaming window. \u00a0When an operator is checkpointed, the 
entire\nobject is written to HDFS. \u00a0The larger the amount of state in 
an\noperator, the longer it takes to recover from a failure. A 
stateless\noperator can recover much quicker than a stateful one. The 
needed\nwindows are preserved by the upstream buffer server and are used 
to\nrecompute the lost windows, and also rebuild the buffer server in 
the\ncurrent container.\n\n\nThe distinction between Stateless and Stateful is 
based solely on\nthe need to transfer data in the operator from one window to 
the next.\nThe state of an operator is independent of the number of 
ports.\n\n\nStateless\n\n\nA Stateless operator is defined as one where no data 
is needed to\nbe kept at the end of every window. This means that all the 
computations\nof a window can be derived from all the tuples the operator 
receives\nwithin that window. This guarantees that the output of any window can 
be\nreconstructed by simply replaying the tupl
 es that arrived in that\nwindow. Stateless operators are more efficient in 
terms of fault\ntolerance, and cost to achieve SLA.\n\n\nStateful\n\n\nA 
Stateful operator is defined as one where data is needed to be\nstored at the 
end of a window for computations occurring in later\nwindow; a common example 
is the computation of a sum of values in the\ninput tuples.\n\n\nOperator 
API\n\n\nThe Operator API consists of methods that operator developers 
may\nneed to override. In this section we will discuss the Operator APIs 
from\nthe point of view of an application developer. Knowledge of how 
an\noperator works internally is critical for writing an application. 
Those\ninterested in the details should refer to  Malhar Operator Developer 
Guide.\n\n\nThe APIs are available in three modes, namely Single 
Streaming\nWindow, Sliding Application Window, and Aggregate Application 
Window.\nThese are not mutually exclusive, i.e. an operator can use 
single\nstreaming window as well as sliding applicati
 on window. A physical\ninstance of an operator is always processing tuples 
from a single\nwindow. The processing of tuples is guaranteed to be sequential, 
no\nmatter which input port the tuples arrive on.\n\n\nIn the later part of 
this section we will evaluate three common\nuses of streaming windows by 
applications. They have different\ncharacteristics and implications on 
optimization and recovery mechanisms\n(i.e. algorithm used to recover a node 
after outage) as discussed later\nin the section.\n\n\nStreaming 
Window\n\n\nStreaming window is atomic micro-batch computation period. The 
API\nmethods relating to a streaming window are as follows\n\n\npublic void 
process(\ntuple_type\n tuple) // Called on the input port on which the tuple 
arrives\npublic void beginWindow(long windowId) // Called at the start of the 
window as soon as the first begin_window tuple arrives\npublic void endWindow() 
// Called at the end of the window after end_window tuples arrive on all input 
ports\npublic v
 oid setup(OperatorContext context) // Called once during initialization of the 
operator\npublic void teardown() // Called once when the operator is being 
shutdown\n\n\n\n\nA tuple can be emitted in any of the three streaming 
run-time\ncalls, namely beginWindow, process, and endWindow but not in setup 
or\nteardown.\n\n\nAggregate Application Window\n\n\nAn operator with an 
aggregate window is stateful within the\napplication window timeframe and 
possibly stateless at the end of that\napplication window. An size of an 
aggregate application window is an\noperator attribute and is defined as a 
multiple of the streaming window\nsize. The platform recognizes this attribute 
and optimizes the operator.\nThe beginWindow, and endWindow calls are not 
invoked for those streaming\nwindows that do not align with the application 
window. For example in\ncase of streaming window of 0.5 second and application 
window of 5\nminute, an application window spans 600 streaming windows (5*60*2 
=\n600). At t
 he start of the sequence of these 600 atomic streaming\nwindows, a beginWindow 
gets invoked, and at the end of these 600\nstreaming windows an endWindow gets 
invoked. All the intermediate\nstreaming windows do not invoke beginWindow or 
endWindow. Bookkeeping,\nnode recovery, stats, UI, etc. continue to work off 
streaming windows.\nFor example if operators are being checkpointed say on an 
average every\n30th window, then the above application window would have about 
20\ncheckpoints.\n\n\nSliding Application Window\n\n\nA sliding window is 
computations that requires previous N\nstreaming windows. After each streaming 
window the Nth past window is\ndropped and the new window is added to the 
computation. An operator with\nsliding window is a stateful operator at end of 
any window. The sliding\nwindow period is an attribute and is a multiple of 
streaming window. The\nplatform recognizes this attribute and leverages it 
during bookkeeping.\nA sliding aggregate window with tolerance to data
  loss does not have a\nvery high bookkeeping cost. The cost of all three 
recovery mechanisms,\n at most once\u00a0(data loss tolerant),\nat least 
once\u00a0(data loss\nintolerant), and exactly once\u00a0(data\nloss intolerant 
and no extra computations) is same as recovery\nmechanisms based on streaming 
window. STRAM is not able to leverage this\noperator for any extra 
optimization.\n\n\nSingle vs Multi-Input Operator\n\n\nA single-input operator 
by definition has a single upstream\noperator, since there can only be one 
writing port for a stream. \u00a0If an\noperator has a single upstream 
operator, then the beginWindow on the\nupstream also blocks the beginWindow of 
the single-input operator. For\nan operator to start processing any window at 
least one upstream\noperator has to start processing that window. A multi-input 
operator\nreads from more than one upstream ports. Such an operator would 
start\nprocessing as soon as the first begin_window event arrives. However 
the\nwindow wou
 ld not close (i.e. invoke endWindow) till all ports receive\nend_window events 
for that windowId. Thus the end of a window is a\nblocking event. As we saw 
earlier, a multi-input operator is also the\npoint in the DAG where windows of 
all upstream operators are\nsynchronized. The windows (atomic micro-batches) 
from a faster (or just\nahead in processing) upstream operators are queued up 
till the slower\nupstream operator catches up. STRAM monitors such bottlenecks 
and takes\ncorrective actions. The platform ensures minimal delay, i.e 
processing\nstarts as long as at least one upstream operator has 
started\nprocessing.\n\n\nRecovery Mechanisms\n\n\nApplication developers can 
set any of the recovery mechanisms\nbelow to deal with node outage. In general, 
the cost of recovery depends\non the state of the operator, while data 
integrity is dependant on the\napplication. The mechanisms are per window as 
the platform treats\nwindows as atomic compute units. Three recovery mechanisms 
are\nsu
 pported, namely\n\n\n\n\nAt-least-once: All atomic batches are processed at 
least once.\n    No data loss occurs.\n\n\nAt-most-once: All atomic batches are 
processed at most once.\n    Data loss is possible; this is the most efficient 
setting.\n\n\nExactly-once: All atomic batches are processed exactly once.\n    
No data loss occurs; this is the least efficient setting since\n    additional 
work is needed to ensure proper semantics.\n\n\n\n\nAt-least-once is the 
default. During a recovery event, the\noperator connects to the upstream buffer 
server and asks for windows to\nbe replayed. At-least-once and exactly-once 
mechanisms start from its\ncheckpointed state. At-most-once starts from the 
next begin-window\nevent.\n\n\nRecovery mechanisms can be specified per 
Operator while writing\nthe application as shown below.\n\n\nOperator o = 
dag.addOperator(\u201coperator\u201d, \u2026);\ndag.setAttribute(o,  
OperatorContext.PROCESSING_MODE,  ProcessingMode.AT_MOST_ONCE);\n\n\n\n\nAlso 
note 
 that once an operator is attributed to AT_MOST_ONCE,\nall the operators 
downstream to it have to be AT_MOST_ONCE. The client\nwill give appropriate 
warnings or errors if that\u2019s not the case.\n\n\nDetails are explained in 
the chapter on Fault Tolerance below.\n\n\nStreams\n\n\nA stream\u00a0is a 
connector\n(edge) abstraction, and is a fundamental building block of the 
platform.\nA stream consists of tuples that flow from one port (called 
the\noutput\u00a0port) to one or more ports\non other operators (called  
input\u00a0ports) another -- so note a potentially\nconfusing aspect of this 
terminology: tuples enter a stream through its\noutput port and leave via one 
or more input ports. A stream has the\nfollowing 
characteristics\n\n\n\n\nTuples are always delivered in the same order in which 
they\n    were emitted.\n\n\nConsists of a sequence of windows one after 
another. Each\n    window being a collection of in-order tuples.\n\n\nA stream 
that connects two containers passes throug
 h a\n    buffer server.\n\n\nAll streams can be persisted (by default in 
HDFS).\n\n\nExactly one output port writes to the stream.\n\n\nCan be read by 
one or more input ports.\n\n\nConnects operators within an application, not 
outside\n    an application.\n\n\nHas an unique name within an 
application.\n\n\nHas attributes which act as hints to STRAM.\n\n\n\n\nStreams 
have four modes, namely in-line, in-node, in-rack,\n    and other. Modes may be 
overruled (for example due to lack\n    of containers). They are defined as 
follows:\n\n\n\n\nTHREAD_LOCAL: In the same thread, uses thread\n    stack 
(intra-thread). This mode can only be used for a downstream\n    operator which 
has only one input port connected; also called\n    
in-line.\n\n\nCONTAINER_LOCAL: In the same container (intra-process); also\n    
called in-container.\n\n\nNODE_LOCAL: In the same Hadoop node (inter processes, 
skips\n    NIC); also called in-node.\n\n\nRACK_LOCAL: On nodes in the same 
rack; also called\n    in-rac
 k.\n\n\nunspecified: No guarantee. Could be anywhere within the\n    
cluster\n\n\n\n\n\n\n\n\nAn example of a stream declaration is given 
below\n\n\nDAG dag = new DAG();\n \u2026\ndag.addStream(\nviews\n, 
viewAggregate.sum, cost.data).setLocality(CONTAINER_LOCAL); // A container 
local  stream\ndag.addStream(\u201cclicks\u201d, clickAggregate.sum, rev.data); 
// An example of unspecified locality\n\n\n\n\nThe platform guarantees in-order 
delivery of tuples in a stream.\nSTRAM views each stream as collection of 
ordered windows. Since no tuple\ncan exist outside a window, a replay of a 
stream consists of replay of a\nset of windows. When multiple input ports read 
the same stream, the\nexecution plan of a stream ensures that each input port 
is logically not\nblocked by the reading of another input port. The schema of a 
stream is\nsame as the schema of the tuple.\n\n\nIn a stream all tuples emitted 
by an operator in a window belong\nto that window. A replay of this window 
would consists o
 f an in-order\nreplay of all the tuples. Thus the tuple order within a stream 
is\nguaranteed. However since an operator may receive multiple streams 
(for\nexample an operator with two input p

<TRUNCATED>

Reply via email to