http://git-wip-us.apache.org/repos/asf/apex-site/blob/82e5a921/content/docs/apex-3.6/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/content/docs/apex-3.6/mkdocs/search_index.json 
b/content/docs/apex-3.6/mkdocs/search_index.json
new file mode 100644
index 0000000..a82b130
--- /dev/null
+++ b/content/docs/apex-3.6/mkdocs/search_index.json
@@ -0,0 +1,1224 @@
+{
+    "docs": [
+        {
+            "location": "/", 
+            "text": "Apache Apex\n\n\nApex is a Hadoop YARN native big data 
processing platform, enabling real time stream as well as batch processing for 
your big data.  Apex provides the following benefits:\n\n\n\n\nHigh scalability 
and performance\n\n\nFault tolerance and state management\n\n\nHadoop-native 
YARN \n HDFS implementation\n\n\nEvent processing guarantees\n\n\nSeparation of 
functional and operational concerns\n\n\nSimple API supports generic Java 
code\n\n\n\n\nPlatform has been demonstated to scale linearly across Hadoop 
clusters under extreme loads of billions of events per second.  Hardware and 
process failures are quickly recovered with HDFS-backed checkpointing and 
automatic operator recovery, preserving application state and resuming 
execution in seconds.  Functional and operational specifications are separated. 
 Apex provides a simple API, which enables users to write generic, reusable 
code.  The code is dropped in as-is and platform automatically handles the va
 rious operational concerns, such as state management, fault tolerance, 
scalability, security, metrics, etc.  This frees users to focus on functional 
development, and lets platform provide operability support.\n\n\nThe core Apex 
platform is supplemented by Malhar, a library of connector and logic functions, 
enabling rapid application development.  These operators and modules provide 
access to HDFS, S3, NFS, FTP, and other file systems; Kafka, ActiveMQ, 
RabbitMQ, JMS, and other message systems; MySql, Cassandra, MongoDB, Redis, 
HBase, CouchDB, generic JDBC, and other database connectors.  In addition to 
the operators, the library contains a number of demos applications, 
demonstrating operator features and capabilities.  To see the full list of 
available operators and related documentation, visit \nApex Malhar on 
Github\n\n\nFor additional information visit \nApache Apex\n.", 
+            "title": "Apache Apex"
+        }, 
+        {
+            "location": "/#apache-apex", 
+            "text": "Apex is a Hadoop YARN native big data processing 
platform, enabling real time stream as well as batch processing for your big 
data.  Apex provides the following benefits:   High scalability and performance 
 Fault tolerance and state management  Hadoop-native YARN   HDFS implementation 
 Event processing guarantees  Separation of functional and operational concerns 
 Simple API supports generic Java code   Platform has been demonstated to scale 
linearly across Hadoop clusters under extreme loads of billions of events per 
second.  Hardware and process failures are quickly recovered with HDFS-backed 
checkpointing and automatic operator recovery, preserving application state and 
resuming execution in seconds.  Functional and operational specifications are 
separated.  Apex provides a simple API, which enables users to write generic, 
reusable code.  The code is dropped in as-is and platform automatically handles 
the various operational concerns, such as state management
 , fault tolerance, scalability, security, metrics, etc.  This frees users to 
focus on functional development, and lets platform provide operability support. 
 The core Apex platform is supplemented by Malhar, a library of connector and 
logic functions, enabling rapid application development.  These operators and 
modules provide access to HDFS, S3, NFS, FTP, and other file systems; Kafka, 
ActiveMQ, RabbitMQ, JMS, and other message systems; MySql, Cassandra, MongoDB, 
Redis, HBase, CouchDB, generic JDBC, and other database connectors.  In 
addition to the operators, the library contains a number of demos applications, 
demonstrating operator features and capabilities.  To see the full list of 
available operators and related documentation, visit  Apex Malhar on Github  
For additional information visit  Apache Apex .", 
+            "title": "Apache Apex"
+        }, 
+        {
+            "location": "/apex_development_setup/", 
+            "text": "Apache Apex Development Environment Setup\n\n\nThis 
document discusses the steps needed for setting up a development environment 
for creating applications that run on the Apache Apex 
platform.\n\n\nDevelopment Tools\n\n\nThere are a few tools that will be 
helpful when developing Apache Apex applications, including:\n\n\n\n\n\n\ngit\n 
- A revision control system (version 1.7.1 or later). There are multiple git 
clients available for Windows (\nhttp://git-scm.com/download/win\n for 
example), so download and install a client of your choice.\n\n\n\n\n\n\njava 
JDK\n (not JRE) - Includes the Java Runtime Environment as well as the Java 
compiler and a variety of tools (version 1.7.0_79 or later). Can be downloaded 
from the Oracle website.\n\n\n\n\n\n\nmaven\n - Apache Maven is a build system 
for Java projects (version 3.0.5 or later). It can be downloaded from 
\nhttps://maven.apache.org/download.cgi\n.\n\n\n\n\n\n\nIDE\n (Optional) - If 
you prefer to use an IDE (Integra
 ted Development Environment) such as \nNetBeans\n, \nEclipse\n or 
\nIntelliJ\n, install that as well.\n\n\n\n\n\n\nAfter installing these tools, 
make sure that the directories containing the executable files are in your PATH 
environment variable.\n\n\n\n\nWindows\n - Open a console window and enter the 
command \necho %PATH%\n to see the value of the \nPATH\n variable and verify 
that the above directories for Java, git, and maven executables are present.  
JDK executables like \njava\n and \njavac\n, the directory might be something 
like \nC:\\Program Files\\Java\\jdk1.7.0\\_80\\bin\n; for \ngit\n it might be 
\nC:\\Program Files\\Git\\bin\n; and for maven it might be 
\nC:\\Users\\user\\Software\\apache-maven-3.3.3\\bin\n.  If not, you can change 
its value clicking on the button at \nControl Panel\n \n \nAdvanced System 
Settings\n \n \nAdvanced tab\n \n \nEnvironment Variables\n.\n\n\nLinux and 
Mac\n - Open a console/terminal window and enter the command \necho $PATH\n to 
see the value
  of the \nPATH\n variable and verify that the above directories for Java, git, 
and maven executables are present.  If not, make sure software is downloaded 
and installed, and optionally PATH reference is added and exported  in a 
\n~/.profile\n or \n~/.bash_profile\n.  For example to add maven located in 
\n/sfw/maven/apache-maven-3.3.3\n to PATH add the line: \nexport 
PATH=$PATH:/sfw/maven/apache-maven-3.3.3/bin\n\n\n\n\nConfirm by running the 
following commands and comparing with output that show in the table 
below:\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nCommand\n\n\nOutput\n\n\n\n\n\n\njavac 
-version\n\n\njavac 1.7.0_80\n\n\n\n\n\n\njava -version\n\n\njava version 
\n1.7.0_80\n\n\nJava(TM) SE Runtime Environment (build 1.7.0_80-b15)\n\n\nJava 
HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)\n\n\n\n\n\n\ngit 
--version\n\n\ngit version 2.6.1.windows.1\n\n\n\n\n\n\nmvn 
--version\n\n\nApache Maven 3.3.3 (7994120775791599e205a5524ec3e0dfe41d4a06; 
2015-04-22T06:57:37-05:00)\n\n\n...\n
 \n\n\n\n\n\n\n\n\n\n\nCreating New Apex Project\n\n\nAfter development tools 
are configured, you can now use the maven archetype to create a basic Apache 
Apex project.  \nNote:\n When executing the commands below, replace \n3.4.0\n 
by \nlatest available version\n of Apache Apex.\n\n\n\n\n\n\nWindows\n - Create 
a new Windows command file called \nnewapp.cmd\n by copying the lines below, 
and execute it.  When you run this file, the properties will be displayed and 
you will be prompted with \nY: :\n; just press \nEnter\n to complete the 
project generation.  The caret (^) at the end of some lines indicates that a 
continuation line follows. \n\n\n@echo off\n@rem Script for creating a new 
application\nsetlocal\nmvn archetype:generate ^\n 
-DarchetypeGroupId=org.apache.apex ^\n -DarchetypeArtifactId=apex-app-archetype 
-DarchetypeVersion=3.4.0 ^\n -DgroupId=com.example 
-Dpackage=com.example.myapexapp -DartifactId=myapexapp ^\n 
-Dversion=1.0-SNAPSHOT\nendlocal\n\n\n\n\n\n\n\nLinux\n - Execute
  the lines below in a terminal window.  New project will be created in the 
curent working directory.  The backslash (\\) at the end of the lines indicates 
continuation.\n\n\nmvn archetype:generate \\\n 
-DarchetypeGroupId=org.apache.apex \\\n 
-DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.4.0 \\\n 
-DgroupId=com.example -Dpackage=com.example.myapexapp -DartifactId=myapexapp 
\\\n -Dversion=1.0-SNAPSHOT\n\n\n\n\n\n\n\nWhen the run completes successfully, 
you should see a new directory named \nmyapexapp\n containing a maven project 
for building a basic Apache Apex application. It includes 3 source 
files:\nApplication.java\n,  \nRandomNumberGenerator.java\n and 
\nApplicationTest.java\n. You can now build the application by stepping into 
the new directory and running the maven package command:\n\n\ncd myapexapp\nmvn 
clean package -DskipTests\n\n\n\nThe build should create the application 
package file \nmyapexapp/target/myapexapp-1.0-SNAPSHOT.apa\n. This application 
package c
 an then be used to launch example application via \napex\n CLI, or other 
visual management tools.  When running, this application will generate a stream 
of random numbers and print them out, each prefixed by the string \nhello 
world:\n.\n\n\nRunning Unit Tests\n\n\nTo run unit tests on Linux or OSX, 
simply run the usual maven command, for example: \nmvn test\n.\n\n\nOn Windows, 
an additional file, \nwinutils.exe\n, is required; download it 
from\n\nhttps://github.com/srccodes/hadoop-common-2.2.0-bin/archive/master.zip\n\nand
 unpack the archive to, say, \nC:\\hadoop\n; this file should be present 
under\n\nhadoop-common-2.2.0-bin-master\\bin\n within it.\n\n\nSet the 
\nHADOOP_HOME\n environment variable system-wide 
to\n\nc:\\hadoop\\hadoop-common-2.2.0-bin-master\n as described 
at:\n\nhttps://www.microsoft.com/resources/documentation/windows/xp/all/proddocs/en-us/sysdm_advancd_environmnt_addchange_variable.mspx?mfr=true\n.
 You should now be able to run unit tests normally.\n\n\nIf you 
 prefer not to set the variable globally, you can set it on the command line or 
within\nyour IDE. For example, on the command line, specify the maven\nproperty 
\nhadoop.home.dir\n:\n\n\nmvn 
-Dhadoop.home.dir=c:\\hadoop\\hadoop-common-2.2.0-bin-master test\n\n\n\nor set 
the environment variable separately:\n\n\nset 
HADOOP_HOME=c:\\hadoop\\hadoop-common-2.2.0-bin-master\nmvn test\n\n\n\nWithin 
your IDE, set the environment variable and then run the desired\nunit test in 
the usual way. For example, with NetBeans you can 
add:\n\n\nEnv.HADOOP_HOME=c:/hadoop/hadoop-common-2.2.0-bin-master\n\n\n\nat 
\nProperties \n Actions \n Run project \n Set Properties\n.\n\n\nSimilarly, in 
Eclipse (Mars) add it to the\nproject properties at \nProperties \n Run/Debug 
Settings \n ApplicationTest\n\n Environment\n tab.\n\n\nBuilding Apex 
Demos\n\n\nIf you want to see more substantial Apex demo applications and the 
associated source code, you can follow these simple steps to check out and 
build them.\n\n\n\
 n\n\n\nCheck out the source code repositories:\n\n\ngit clone 
https://github.com/apache/apex-core\ngit clone 
https://github.com/apache/apex-malhar\n\n\n\n\n\n\n\nSwitch to the appropriate 
release branch and build each repository:\n\n\ncd apex-core\nmvn clean install 
-DskipTests\n\ncd apex-malhar\nmvn clean install -DskipTests\n\n\n\n\n\n\n\nThe 
\ninstall\n argument to the \nmvn\n command installs resources from each 
project to your local maven repository (typically \n.m2/repository\n under your 
home directory), and \nnot\n to the system directories, so Administrator 
privileges are not required. The  \n-DskipTests\n argument skips running unit 
tests since they take a long time. If this is a first-time installation, it 
might take several minutes to complete because maven will download a number of 
associated plugins.\n\n\nAfter the build completes, you should see the demo 
application package files in the target directory under each demo subdirectory 
in \napex-malhar/demos\n.\n\n\nSandb
 ox\n\n\nTo jump-start development with Apex, please refer to the \nDownloads\n 
section of the Apache Apex website, which provides a list of 3rd party Apex 
binary packages and sandbox environments.", 
+            "title": "Development Setup"
+        }, 
+        {
+            "location": 
"/apex_development_setup/#apache-apex-development-environment-setup", 
+            "text": "This document discusses the steps needed for setting up a 
development environment for creating applications that run on the Apache Apex 
platform.", 
+            "title": "Apache Apex Development Environment Setup"
+        }, 
+        {
+            "location": "/apex_development_setup/#development-tools", 
+            "text": "There are a few tools that will be helpful when 
developing Apache Apex applications, including:    git  - A revision control 
system (version 1.7.1 or later). There are multiple git clients available for 
Windows ( http://git-scm.com/download/win  for example), so download and 
install a client of your choice.    java JDK  (not JRE) - Includes the Java 
Runtime Environment as well as the Java compiler and a variety of tools 
(version 1.7.0_79 or later). Can be downloaded from the Oracle website.    
maven  - Apache Maven is a build system for Java projects (version 3.0.5 or 
later). It can be downloaded from  https://maven.apache.org/download.cgi .    
IDE  (Optional) - If you prefer to use an IDE (Integrated Development 
Environment) such as  NetBeans ,  Eclipse  or  IntelliJ , install that as well. 
   After installing these tools, make sure that the directories containing the 
executable files are in your PATH environment variable.   Windows  - Open a 
console window and
  enter the command  echo %PATH%  to see the value of the  PATH  variable and 
verify that the above directories for Java, git, and maven executables are 
present.  JDK executables like  java  and  javac , the directory might be 
something like  C:\\Program Files\\Java\\jdk1.7.0\\_80\\bin ; for  git  it 
might be  C:\\Program Files\\Git\\bin ; and for maven it might be  
C:\\Users\\user\\Software\\apache-maven-3.3.3\\bin .  If not, you can change 
its value clicking on the button at  Control Panel     Advanced System Settings 
    Advanced tab     Environment Variables .  Linux and Mac  - Open a 
console/terminal window and enter the command  echo $PATH  to see the value of 
the  PATH  variable and verify that the above directories for Java, git, and 
maven executables are present.  If not, make sure software is downloaded and 
installed, and optionally PATH reference is added and exported  in a  
~/.profile  or  ~/.bash_profile .  For example to add maven located in  
/sfw/maven/apache-maven-3.3
 .3  to PATH add the line:  export PATH=$PATH:/sfw/maven/apache-maven-3.3.3/bin 
  Confirm by running the following commands and comparing with output that show 
in the table below:         Command  Output    javac -version  javac 1.7.0_80   
 java -version  java version  1.7.0_80  Java(TM) SE Runtime Environment (build 
1.7.0_80-b15)  Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)  
  git --version  git version 2.6.1.windows.1    mvn --version  Apache Maven 
3.3.3 (7994120775791599e205a5524ec3e0dfe41d4a06; 2015-04-22T06:57:37-05:00)  
...", 
+            "title": "Development Tools"
+        }, 
+        {
+            "location": "/apex_development_setup/#creating-new-apex-project", 
+            "text": "After development tools are configured, you can now use 
the maven archetype to create a basic Apache Apex project.   Note:  When 
executing the commands below, replace  3.4.0  by  latest available version  of 
Apache Apex.    Windows  - Create a new Windows command file called  newapp.cmd 
 by copying the lines below, and execute it.  When you run this file, the 
properties will be displayed and you will be prompted with  Y: : ; just press  
Enter  to complete the project generation.  The caret (^) at the end of some 
lines indicates that a continuation line follows.   @echo off\n@rem Script for 
creating a new application\nsetlocal\nmvn archetype:generate ^\n 
-DarchetypeGroupId=org.apache.apex ^\n -DarchetypeArtifactId=apex-app-archetype 
-DarchetypeVersion=3.4.0 ^\n -DgroupId=com.example 
-Dpackage=com.example.myapexapp -DartifactId=myapexapp ^\n 
-Dversion=1.0-SNAPSHOT\nendlocal    Linux  - Execute the lines below in a 
terminal window.  New project will be created in t
 he curent working directory.  The backslash (\\) at the end of the lines 
indicates continuation.  mvn archetype:generate \\\n 
-DarchetypeGroupId=org.apache.apex \\\n 
-DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.4.0 \\\n 
-DgroupId=com.example -Dpackage=com.example.myapexapp -DartifactId=myapexapp 
\\\n -Dversion=1.0-SNAPSHOT    When the run completes successfully, you should 
see a new directory named  myapexapp  containing a maven project for building a 
basic Apache Apex application. It includes 3 source files: Application.java ,   
RandomNumberGenerator.java  and  ApplicationTest.java . You can now build the 
application by stepping into the new directory and running the maven package 
command:  cd myapexapp\nmvn clean package -DskipTests  The build should create 
the application package file  myapexapp/target/myapexapp-1.0-SNAPSHOT.apa . 
This application package can then be used to launch example application via  
apex  CLI, or other visual management tools.  When runnin
 g, this application will generate a stream of random numbers and print them 
out, each prefixed by the string  hello world: .", 
+            "title": "Creating New Apex Project"
+        }, 
+        {
+            "location": "/apex_development_setup/#running-unit-tests", 
+            "text": "To run unit tests on Linux or OSX, simply run the usual 
maven command, for example:  mvn test .  On Windows, an additional file,  
winutils.exe , is required; download it from 
https://github.com/srccodes/hadoop-common-2.2.0-bin/archive/master.zip \nand 
unpack the archive to, say,  C:\\hadoop ; this file should be present under 
hadoop-common-2.2.0-bin-master\\bin  within it.  Set the  HADOOP_HOME  
environment variable system-wide to c:\\hadoop\\hadoop-common-2.2.0-bin-master  
as described at: 
https://www.microsoft.com/resources/documentation/windows/xp/all/proddocs/en-us/sysdm_advancd_environmnt_addchange_variable.mspx?mfr=true
 . You should now be able to run unit tests normally.  If you prefer not to set 
the variable globally, you can set it on the command line or within\nyour IDE. 
For example, on the command line, specify the maven\nproperty  hadoop.home.dir 
:  mvn -Dhadoop.home.dir=c:\\hadoop\\hadoop-common-2.2.0-bin-master test  or 
set the environment variable
  separately:  set HADOOP_HOME=c:\\hadoop\\hadoop-common-2.2.0-bin-master\nmvn 
test  Within your IDE, set the environment variable and then run the 
desired\nunit test in the usual way. For example, with NetBeans you can add:  
Env.HADOOP_HOME=c:/hadoop/hadoop-common-2.2.0-bin-master  at  Properties   
Actions   Run project   Set Properties .  Similarly, in Eclipse (Mars) add it 
to the\nproject properties at  Properties   Run/Debug Settings   
ApplicationTest  Environment  tab.", 
+            "title": "Running Unit Tests"
+        }, 
+        {
+            "location": "/apex_development_setup/#building-apex-demos", 
+            "text": "If you want to see more substantial Apex demo 
applications and the associated source code, you can follow these simple steps 
to check out and build them.    Check out the source code repositories:  git 
clone https://github.com/apache/apex-core\ngit clone 
https://github.com/apache/apex-malhar    Switch to the appropriate release 
branch and build each repository:  cd apex-core\nmvn clean install 
-DskipTests\n\ncd apex-malhar\nmvn clean install -DskipTests    The  install  
argument to the  mvn  command installs resources from each project to your 
local maven repository (typically  .m2/repository  under your home directory), 
and  not  to the system directories, so Administrator privileges are not 
required. The   -DskipTests  argument skips running unit tests since they take 
a long time. If this is a first-time installation, it might take several 
minutes to complete because maven will download a number of associated plugins. 
 After the build completes, you should see
  the demo application package files in the target directory under each demo 
subdirectory in  apex-malhar/demos .", 
+            "title": "Building Apex Demos"
+        }, 
+        {
+            "location": "/apex_development_setup/#sandbox", 
+            "text": "To jump-start development with Apex, please refer to the  
Downloads  section of the Apache Apex website, which provides a list of 3rd 
party Apex binary packages and sandbox environments.", 
+            "title": "Sandbox"
+        }, 
+        {
+            "location": "/application_development/", 
+            "text": "Application Developer Guide\n\n\nThe Apex platform is 
designed to process massive amounts of\nreal-time events natively in Hadoop.  
It runs as a YARN (Hadoop 2.x) \napplication and leverages Hadoop as a 
distributed operating\nsystem.  All the basic distributed operating system 
capabilities of\nHadoop like resource management (YARN), distributed file 
system (HDFS),\nmulti-tenancy, security, fault-tolerance, and scalability are 
supported natively \nin all the Apex applications. \u00a0The platform handles 
all the details of the application \nexecution, including dynamic scaling, 
state checkpointing and recovery, event \nprocessing guarantees, etc. allowing 
you to focus on writing your application logic without\nmixing operational and 
functional concerns.\n\n\nIn the platform, building a streaming application can 
be extremely\neasy and intuitive. \u00a0The application is represented as a 
Directed\nAcyclic Graph (DAG) of computation units called \nOperators\n interco
 nnected\nby the data-flow edges called  \nStreams\n.\u00a0The operators 
process input\nstreams and produce output streams. A library of common 
operators is\nprovided to enable quick application development. \u00a0In case 
the desired\nprocessing is not available in the Operator Library, one can 
easily\nwrite a custom operator. We refer those interested in creating their 
own\noperators to the \nOperator Development Guide\n.\n\n\nRunning A Test 
Application\n\n\nIf you are starting with the Apex platform for the first 
time,\nit can be informative to launch an existing application and see it 
run.\nOne of the simplest examples provided in \nApex-Malhar repository\n is a 
Pi demo application,\nwhich computes the value of PI using random numbers.  
After \nsetting up development environment\n\nPi demo can be launched as 
follows:\n\n\n\n\nOpen up Apex Malhar files in your IDE (for example Eclipse, 
IntelliJ, NetBeans, etc)\n\n\nNavigate to 
\ndemos/pi/src/test/java/com/datatorrent/demos/Applicat
 ionTest.java\n\n\nRun the test for ApplicationTest.java\n\n\nView the output 
in system console\n\n\n\n\nCongratulations, you just ran your first real-time 
streaming demo :) \nThis demo is very simple and has four operators. The first 
operator\nemits random integers between 0 to 30, 000. The second operator 
receives\nthese coefficients and emits a hashmap with x and y values each time 
it\nreceives two values. The third operator takes these values and 
computes\nx**2+y**2. The last operator counts how many computed values 
from\nthe previous operator were less than or equal to 30, 000**2. 
Assuming\nthis count is N, then PI is computed as N/number of values 
received.\nHere is the code snippet for the PI application. This code populates 
the\nDAG. Do not worry about what each line does, we will cover these\nconcepts 
later in this document.\n\n\n// Generates random numbers\nRandomEventGenerator 
rand = dag.addOperator(\nrand\n, new 
RandomEventGenerator());\nrand.setMinvalue(0);\nrand.setMaxv
 alue(30000);\n\n// Generates a round robin HashMap of \nx\n and 
\ny\n\nRoundRobinHashMap\nString,Object\n rrhm = dag.addOperator(\nrrhm\n, new 
RoundRobinHashMap\nString, Object\n());\nrrhm.setKeys(new String[] { \nx\n, 
\ny\n });\n\n// Calculates pi from x and y\nJavaScriptOperator calc = 
dag.addOperator(\npicalc\n, new 
Script());\ncalc.setPassThru(false);\ncalc.put(\ni\n,0);\ncalc.put(\ncount\n,0);\ncalc.addSetupScript(\nfunction
 pi() { if (x*x+y*y \n= \n+maxValue*maxValue+\n) { i++; } count++; return i / 
count * 4; }\n);\ncalc.setInvoke(\npi\n);\ndag.addStream(\nrand_rrhm\n, 
rand.integer_data, rrhm.data);\ndag.addStream(\nrrhm_calc\n, rrhm.map, 
calc.inBindings);\n\n// puts results on system console\nConsoleOutputOperator 
console = dag.addOperator(\nconsole\n, new 
ConsoleOutputOperator());\ndag.addStream(\nrand_console\n,calc.result, 
console.input);\n\n\n\n\nYou can review the other demos and see what they do. 
The examples\ngiven in the Demos project cover various features of the pl
 atform and we\nstrongly encourage you to read these to familiarize yourself 
with the\nplatform. In the remaining part of this document we will go 
through\ndetails needed for you to develop and run streaming applications 
in\nMalhar.\n\n\nTest Application: Yahoo! Finance Quotes\n\n\nThe 
PI\u00a0application was to\nget you started. It is a basic application and does 
not fully illustrate\nthe features of the platform. For the purpose of 
describing concepts, we\nwill consider the test application shown in Figure 1. 
The application\ndownloads tick data from  \nYahoo! Finance\n \u00a0and 
computes the\nfollowing for four tickers, namely \nIBM\n,\n\nGOOG\n, 
\nYHOO\n.\n\n\n\n\nQuote: Consisting of last trade price, last trade time, 
and\n    total volume for the day\n\n\nPer-minute chart data: Highest trade 
price, lowest trade\n    price, and volume during that minute\n\n\nSimple 
Moving Average: trade price over 5 minutes\n\n\n\n\nTotal volume must ensure 
that all trade volume for that day is\
 nadded, i.e. data loss would result in wrong results. Charting data needs\nall 
the trades in the same minute to go to the same slot, and then on it\nstarts 
afresh, so again data loss would result in wrong results. The\naggregation for 
charting data is done over 1 minute. Simple moving\naverage computes the 
average price over a 5 minute sliding window; it\ntoo would produce wrong 
results if there is data loss. Figure 1 shows\nthe application with no 
partitioning.\n\n\n\n\nThe operator 
StockTickerInput:\u00a0StockTickerInput\n\u00a0\nis\nthe input operator that 
reads live data from Yahoo! Finance once per\ninterval (user configurable in 
milliseconds), and emits the price, the\nincremental volume, and the last trade 
time of each stock symbol, thus\nemulating real ticks from the exchange. 
\u00a0We utilize the Yahoo! Finance\nCSV web service interface. \u00a0For 
example:\n\n\n$ GET 
'http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO\nf=sl1vt1'\n\nIBM\n,203.966,1513041,\n
 
1:43pm\n\n\nGOOG\n,762.68,1879741,\n1:43pm\n\n\nAAPL\n,444.3385,11738366,\n1:43pm\n\n\nYHOO\n,19.3681,14707163,\n1:43pm\n\n\n\n\n\nAmong
 all the operators in Figure 1, StockTickerInput is the only\noperator that 
requires extra code because it contains a custom mechanism\nto get the input 
data. \u00a0Other operators are used unchanged from the\nMalhar 
library.\n\n\nHere is the class implementation for StockTickInput:\n\n\npackage 
com.datatorrent.demos.yahoofinance;\n\nimport 
au.com.bytecode.opencsv.CSVReader;\nimport 
com.datatorrent.annotation.OutputPortFieldAnnotation;\nimport 
com.datatorrent.api.Context.OperatorContext;\nimport 
com.datatorrent.api.DefaultOutputPort;\nimport 
com.datatorrent.api.InputOperator;\nimport 
com.datatorrent.lib.util.KeyValPair;\nimport java.io.IOException;\nimport 
java.io.InputStream;\nimport java.io.InputStreamReader;\nimport 
java.util.*;\nimport org.apache.commons.httpclient.HttpClient;\nimport 
org.apache.commons.httpclient.HttpStatus;\nimport org.apache.
 commons.httpclient.cookie.CookiePolicy;\nimport 
org.apache.commons.httpclient.methods.GetMethod;\nimport 
org.apache.commons.httpclient.params.DefaultHttpParams;\nimport 
org.slf4j.Logger;\nimport org.slf4j.LoggerFactory;\n\n/**\n * This operator 
sends price, volume and time into separate ports and calculates incremental 
volume.\n */\npublic class StockTickInput implements InputOperator\n{\n  
private static final Logger logger = 
LoggerFactory.getLogger(StockTickInput.class);\n  /**\n   * Timeout interval 
for reading from server. 0 or negative indicates no timeout.\n   */\n  public 
int readIntervalMillis = 500;\n  /**\n   * The URL of the web service resource 
for the POST request.\n   */\n  private String url;\n  public String[] 
symbols;\n  private transient HttpClient client;\n  private transient GetMethod 
method;\n  private HashMap\nString, Long\n lastVolume = new HashMap\nString, 
Long\n();\n  private boolean outputEvenIfZeroVolume = false;\n  /**\n   * The 
output port to emit price.
 \n   */\n  @OutputPortFieldAnnotation(optional = true)\n  public final 
transient DefaultOutputPort\nKeyValPair\nString, Double\n price = new 
DefaultOutputPort\nKeyValPair\nString, Double\n();\n  /**\n   * The output port 
to emit incremental volume.\n   */\n  @OutputPortFieldAnnotation(optional = 
true)\n  public final transient DefaultOutputPort\nKeyValPair\nString, Long\n 
volume = new DefaultOutputPort\nKeyValPair\nString, Long\n();\n  /**\n   * The 
output port to emit last traded time.\n   */\n  
@OutputPortFieldAnnotation(optional = true)\n  public final transient 
DefaultOutputPort\nKeyValPair\nString, String\n time = new 
DefaultOutputPort\nKeyValPair\nString, String\n();\n\n  /**\n   * Prepare URL 
from symbols and parameters. URL will be something like: 
http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO\nf=sl1vt1\n 
  *\n   * @return the URL\n   */\n  private String prepareURL()\n  {\n    
String str = \nhttp://download.finance.yahoo.com/d/quotes.csv?s=\n;\n    for 
 (int i = 0; i \n symbols.length; i++) {\n      if (i != 0) {\n        str += 
\n,\n;\n      }\n      str += symbols[i];\n    }\n    str += 
\nf=sl1vt1\ne=.csv\n;\n    return str;\n  }\n\n  @Override\n  public void 
setup(OperatorContext context)\n  {\n    url = prepareURL();\n    client = new 
HttpClient();\n    method = new GetMethod(url);\n    
DefaultHttpParams.getDefaultParams().setParameter(\nhttp.protocol.cookie-policy\n,
 CookiePolicy.BROWSER_COMPATIBILITY);\n  }\n\n  @Override\n  public void 
teardown()\n  {\n  }\n\n  @Override\n  public void emitTuples()\n  {\n\n    try 
{\n      int statusCode = client.executeMethod(method);\n      if (statusCode 
!= HttpStatus.SC_OK) {\n        System.err.println(\nMethod failed: \n + 
method.getStatusLine());\n      } else {\n        InputStream istream = 
method.getResponseBodyAsStream();\n        // Process response\n        
InputStreamReader isr = new InputStreamReader(istream);\n        CSVReader 
reader = new CSVReader(isr);\n        List\nStri
 ng[]\n myEntries = reader.readAll();\n        for (String[] stringArr: 
myEntries) {\n          ArrayList\nString\n tuple = new 
ArrayList\nString\n(Arrays.asList(stringArr));\n          if (tuple.size() != 
4) {\n            return;\n          }\n          // input csv is 
\nSymbol\n,\nPrice\n,\nVolume\n,\nTime\n\n          String symbol = 
tuple.get(0);\n          double currentPrice = Double.valueOf(tuple.get(1));\n  
        long currentVolume = Long.valueOf(tuple.get(2));\n          String 
timeStamp = tuple.get(3);\n          long vol = currentVolume;\n          // 
Sends total volume in first tick, and incremental volume afterwards.\n          
if (lastVolume.containsKey(symbol)) {\n            vol -= 
lastVolume.get(symbol);\n          }\n\n          if (vol \n 0 || 
outputEvenIfZeroVolume) {\n            price.emit(new KeyValPair\nString, 
Double\n(symbol, currentPrice));\n            volume.emit(new 
KeyValPair\nString, Long\n(symbol, vol));\n            time.emit(new 
KeyValPair\nStrin
 g, String\n(symbol, timeStamp));\n            lastVolume.put(symbol, 
currentVolume);\n          }\n        }\n      }\n      
Thread.sleep(readIntervalMillis);\n    } catch (InterruptedException ex) {\n    
  logger.debug(ex.toString());\n    } catch (IOException ex) {\n      
logger.debug(ex.toString());\n    }\n  }\n\n  @Override\n  public void 
beginWindow(long windowId)\n  {\n  }\n\n  @Override\n  public void 
endWindow()\n  {\n  }\n\n  public void setOutputEvenIfZeroVolume(boolean 
outputEvenIfZeroVolume)\n  {\n    this.outputEvenIfZeroVolume = 
outputEvenIfZeroVolume;\n  }\n\n}\n\n\n\n\nThe operator has three output ports 
that emit the price of the\nstock, the volume of the stock and the last trade 
time of the stock,\ndeclared as public member variables price, volume\u00a0and  
time\u00a0of the class. \u00a0The tuple of the\nprice\u00a0output port is a 
key-value\npair with the stock symbol being the key, and the price being the 
value.\n\u00a0The tuple of the volume\u00a0output\nport i
 s a key value pair with the stock symbol being the key, and the\nincremental 
volume being the value. \u00a0The tuple of the  time\u00a0output port is a key 
value pair with the\nstock symbol being the key, and the last trade time being 
the\nvalue.\n\n\nImportant: Since operators will be\nserialized, all input and 
output ports need to be declared transient\nbecause they are stateless and 
should not be serialized.\n\n\nThe method\u00a0setup(OperatorContext)\ncontains 
the code that is necessary for setting up the HTTP\nclient for querying Yahoo! 
Finance.\n\n\nMethod\u00a0emitTuples() contains\nthe code that reads from 
Yahoo! Finance, and emits the data to the\noutput ports of the operator. 
\u00a0emitTuples()\u00a0will be called one or more times\nwithin one 
application window as long as time is allowed within the\nwindow.\n\n\nNote 
that we want to emulate the tick input stream by having\nincremental volume 
data with Yahoo! Finance data. \u00a0We therefore subtract\nthe previous volume 
f
 rom the current volume to emulate incremental\nvolume for each tick.\n\n\nThe 
operator\nDailyVolume:\u00a0This operator\nreads from the input port, which 
contains the incremental volume tuples\nfrom StockTickInput, and\naggregates 
the data to provide the cumulative volume. \u00a0It uses the\nlibrary class  
SumKeyVal\nK,V\n\u00a0provided in math\u00a0package. \u00a0In this 
case,\nSumKeyVal\nString,Long\n, where K is the stock symbol, V is 
the\naggregated volume, with cumulative\nset to true. (Otherwise if  
cumulativewas set to false, SumKeyVal would\nprovide the sum for the 
application window.) \u00a0Malhar provides a number\nof built-in operators for 
simple operations like this so that\napplication developers do not have to 
write them. \u00a0More examples to\nfollow. This operator assumes that the 
application restarts before the\nmarket opens every day.\n\n\nThe operator 
Quote:\nThis operator has three input ports, which are price 
(from\nStockTickInput), daily_vol (from\nDaily Volum
 e), and time (from\n StockTickInput). \u00a0This operator\njust consolidates 
the three data items and and emits the consolidated\ndata. \u00a0It utilizes 
the class ConsolidatorKeyVal\nK\n\u00a0from the\nstream\u00a0package.\n\n\nThe 
operator HighLow:\u00a0This operator reads from the input port,\nwhich contains 
the price tuples from StockTickInput, and provides the high and the\nlow price 
within the application window. \u00a0It utilizes the library class\n 
RangeKeyVal\nK,V\n\u00a0provided\nin the math\u00a0package. In this 
case,\nRangeKeyVal\nString,Double\n.\n\n\nThe operator MinuteVolume:\nThis 
operator reads from the input port, which contains the\nvolume tuples from 
StockTickInput,\nand aggregates the data to provide the sum of the volume 
within one\nminute. \u00a0Like the operator  DailyVolume, this operator also 
uses\nSumKeyVal\nString,Long\n, but\nwith cumulative set to false. 
\u00a0The\nApplication Window is set to one minute. We will explain how to set 
this\nlater.\n\n\nThe
  operator Chart:\nThis operator is very similar to the operator Quote, except 
that it takes inputs from\nHigh Low\u00a0and  Minute Vol\u00a0and outputs the 
consolidated tuples\nto the output port.\n\n\nThe operator PriceSMA:\nSMA 
stands for - Simple Moving Average. It reads from the\ninput port, which 
contains the price tuples from StockTickInput, and\nprovides the moving average 
price of the stock. \u00a0It utilizes\nSimpleMovingAverage\nString,Double\n, 
which is provided in the\n multiwindow\u00a0package.\nSimpleMovingAverage keeps 
track of the data of the previous N\napplication windows in a sliding manner. 
\u00a0For each end window event, it\nprovides the average of the data in those 
application windows.\n\n\nThe operator Console:\nThis operator just outputs the 
input tuples to the console\n(or stdout). \u00a0In this example, there are four 
console\u00a0operators, which connect to the output\nof  Quote, Chart, PriceSMA 
and VolumeSMA. \u00a0In\npractice, they should be replaced b
 y operators that use the data to\nproduce visualization artifacts like 
charts.\n\n\nConnecting the operators together and constructing 
the\nDAG:\u00a0Now that we know the\noperators used, we will create the DAG, 
set the streaming window size,\ninstantiate the operators, and connect the 
operators together by adding\nstreams that connect the output ports with the 
input ports among those\noperators. \u00a0This code is in the file  
YahooFinanceApplication.java. Refer to Figure 1\nagain for the graphical 
representation of the DAG. \u00a0The last method in\nthe code, namely 
getApplication(),\ndoes all that. \u00a0The rest of the methods are just for 
setting up the\noperators.\n\n\npackage 
com.datatorrent.demos.yahoofinance;\n\nimport 
com.datatorrent.api.ApplicationFactory;\nimport 
com.datatorrent.api.Context.OperatorContext;\nimport 
com.datatorrent.api.DAG;\nimport 
com.datatorrent.api.Operator.InputPort;\nimport 
com.datatorrent.lib.io.ConsoleOutputOperator;\nimport com.datatorrent.lib.mat
 h.RangeKeyVal;\nimport com.datatorrent.lib.math.SumKeyVal;\nimport 
com.datatorrent.lib.multiwindow.SimpleMovingAverage;\nimport 
com.datatorrent.lib.stream.ConsolidatorKeyVal;\nimport 
com.datatorrent.lib.util.HighLow;\nimport 
org.apache.hadoop.conf.Configuration;\n\n/**\n * Yahoo! Finance application 
demo. \np\n\n *\n * Get Yahoo finance feed and calculate minute price range, 
minute volume, simple moving average of 5 minutes.\n */\npublic class 
Application implements StreamingApplication\n{\n  private int 
streamingWindowSizeMilliSeconds = 1000; // 1 second (default is 500ms)\n  
private int appWindowCountMinute = 60;   // 1 minute\n  private int 
appWindowCountSMA = 5 * 60;  // 5 minute\n\n  /**\n   * Get actual Yahoo 
finance ticks of symbol, last price, total daily volume, and last traded 
price.\n   */\n  public StockTickInput getStockTickInputOperator(String name, 
DAG dag)\n  {\n    StockTickInput oper = dag.addOperator(name, 
StockTickInput.class);\n    oper.readIntervalMillis = 200;
 \n    return oper;\n  }\n\n  /**\n   * This sends total daily volume by adding 
volumes from each ticks.\n   */\n  public SumKeyVal\nString, Long\n 
getDailyVolumeOperator(String name, DAG dag)\n  {\n    SumKeyVal\nString, 
Long\n oper = dag.addOperator(name, new SumKeyVal\nString, Long\n());\n    
oper.setType(Long.class);\n    oper.setCumulative(true);\n    return oper;\n  
}\n\n  /**\n   * Get aggregated volume of 1 minute and send at the end window 
of 1 minute.\n   */\n  public SumKeyVal\nString, Long\n 
getMinuteVolumeOperator(String name, DAG dag, int appWindowCount)\n  {\n    
SumKeyVal\nString, Long\n oper = dag.addOperator(name, new SumKeyVal\nString, 
Long\n());\n    oper.setType(Long.class);\n    
oper.setEmitOnlyWhenChanged(true);\ndag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);\n
    return oper;\n  }\n\n  /**\n   * Get High-low range for 1 minute.\n   */\n  
public RangeKeyVal\nString, Double\n getHighLowOperator(String name
 , DAG dag, int appWindowCount)\n  {\n    RangeKeyVal\nString, Double\n oper = 
dag.addOperator(name, new RangeKeyVal\nString, Double\n());\n    
dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);\n
    oper.setType(Double.class);\n    return oper;\n  }\n\n  /**\n   * Quote 
(Merge price, daily volume, time)\n   */\n  public 
ConsolidatorKeyVal\nString,Double,Long,String,?,?\n getQuoteOperator(String 
name, DAG dag)\n  {\n    ConsolidatorKeyVal\nString,Double,Long,String,?,?\n 
oper = dag.addOperator(name, new 
ConsolidatorKeyVal\nString,Double,Long,String,Object,Object\n());\n    return 
oper;\n  }\n\n  /**\n   * Chart (Merge minute volume and minute high-low)\n   
*/\n  public ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n 
getChartOperator(String name, DAG dag)\n  {\n    
ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n oper = dag.addOperator(name, 
new ConsolidatorKeyVal\nString,HighLow,Long,Object,Object,Object\n());\n    
return oper;\n  }
 \n\n  /**\n   * Get simple moving average of price.\n   */\n  public 
SimpleMovingAverage\nString, Double\n 
getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount)\n 
 {\n    SimpleMovingAverage\nString, Double\n oper = dag.addOperator(name, new 
SimpleMovingAverage\nString, Double\n());\n    
oper.setWindowSize(appWindowCount);\n    oper.setType(Double.class);\n    
return oper;\n  }\n\n  /**\n   * Get console for output.\n   */\n  public 
InputPort\nObject\n getConsole(String name, /*String nodeName,*/ DAG dag, 
String prefix)\n  {\n    ConsoleOutputOperator oper = dag.addOperator(name, 
ConsoleOutputOperator.class);\n    oper.setStringFormat(prefix + \n: %s\n);\n   
 return oper.input;\n  }\n\n  /**\n   * Create Yahoo Finance Application DAG.\n 
  */\n  @Override\n  public void populateDAG(DAG dag, Configuration conf)\n  
{\n    
dag.getAttributes().put(DAG.STRAM_WINDOW_SIZE_MILLIS,streamingWindowSizeMilliSeconds);\n\n
    StockTickInput tick = getStockTickInputOperator(
 \nStockTickInput\n, dag);\n    SumKeyVal\nString, Long\n dailyVolume = 
getDailyVolumeOperator(\nDailyVolume\n, dag);\n    
ConsolidatorKeyVal\nString,Double,Long,String,?,?\n quoteOperator = 
getQuoteOperator(\nQuote\n, dag);\n\n    RangeKeyVal\nString, Double\n highlow 
= getHighLowOperator(\nHighLow\n, dag, appWindowCountMinute);\n    
SumKeyVal\nString, Long\n minuteVolume = 
getMinuteVolumeOperator(\nMinuteVolume\n, dag, appWindowCountMinute);\n    
ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n chartOperator = 
getChartOperator(\nChart\n, dag);\n\n    SimpleMovingAverage\nString, Double\n 
priceSMA = getPriceSimpleMovingAverageOperator(\nPriceSMA\n, dag, 
appWindowCountSMA);\n       DefaultPartitionCodec\nString, Double\n codec = new 
DefaultPartitionCodec\nString, Double\n();\n    
dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec);\n    
dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec);\n    
dag.addStream(\nprice\n, tick.price, quoteOperato
 r.in1, highlow.data, priceSMA.data);\n    dag.addStream(\nvol\n, tick.volume, 
dailyVolume.data, minuteVolume.data);\n    dag.addStream(\ntime\n, tick.time, 
quoteOperator.in3);\n    dag.addStream(\ndaily_vol\n, dailyVolume.sum, 
quoteOperator.in2);\n\n    dag.addStream(\nquote_data\n, quoteOperator.out, 
getConsole(\nquoteConsole\n, dag, \nQUOTE\n));\n\n    
dag.addStream(\nhigh_low\n, highlow.range, chartOperator.in1);\n    
dag.addStream(\nvol_1min\n, minuteVolume.sum, chartOperator.in2);\n    
dag.addStream(\nchart_data\n, chartOperator.out, getConsole(\nchartConsole\n, 
dag, \nCHART\n));\n\n    dag.addStream(\nsma_price\n, priceSMA.doubleSMA, 
getConsole(\npriceSMAConsole\n, dag, \nPrice SMA\n));\n\n    return dag;\n  
}\n\n}\n\n\n\n\nNote that we also set a user-specific sliding window for SMA 
that\nkeeps track of the previous N data points. \u00a0Do not confuse this with 
the\nattribute APPLICATION_WINDOW_COUNT.\n\n\nIn the rest of this chapter we 
will run through the process of\nrunnin
 g this application. We assume that \u00a0you are familiar with details\nof 
your Hadoop infrastructure. For installation\ndetails please refer to the 
\nInstallation Guide\n.\n\n\nRunning a Test Application\n\n\nWe will now 
describe how to run the yahoo\nfinance application\u00a0described above in 
different modes\n(local mode, single node on Hadoop, and multi-nodes on 
Hadoop).\n\n\nThe platform runs streaming applications under the control of 
a\nlight-weight Streaming Application Manager (STRAM). Each application 
has\nits own instance of STRAM. STRAM launches the application and\ncontinually 
provides run time monitoring, analysis, and takes action\nsuch as load scaling 
or outage recovery as needed. \u00a0We will discuss\nSTRAM in more detail in 
the next chapter.\n\n\nThe instructions below assume that the platform was 
installed in a\ndirectory \nINSTALL_DIR\n and the command line interface (CLI) 
will\nbe used to launch the demo application. An application can be run 
in\nlocal mode\u00
 a0(in IDE or from command line) or on a Hadoop cluster.\n\n\nTo start the Apex 
CLI run\n\n\nINSTALL_DIR\n/bin/apex\n\n\n\nThe command line prompt appears.  To 
start the application in local mode (the actual version number in the file name 
may differ)\n\n\napex\n launch -local 
\nINSTALL_DIR\n/yahoo-finance-demo-3.2.0-SNAPSHOT.apa\n\n\n\nTo terminate the 
application in local mode, enter Ctrl-C\n\n\nTu run the application on the 
Hadoop cluster (the actual version\nnumber in the file name may 
differ)\n\n\napex\n launch 
\nINSTALL_DIR\n/yahoo-finance-demo-3.2.0-SNAPSHOT.apa\n\n\n\nTo stop the 
application running in Hadoop, terminate it in the Apex CLI:\n\n\napex\n 
kill-app\n\n\n\nExecuting the application in either mode includes the 
following\nsteps. At a top level, STRAM (Streaming Application Manager) 
validates\nthe application (DAG), translates the logical plan to the physical 
plan\nand then launches the execution engine. The mode determines 
the\nresources needed and how how they are u
 sed.\n\n\nLocal Mode\n\n\nIn local mode, the application is run as a 
single-process\u00a0with multiple threads. Although a\nfew Hadoop classes are 
needed, there is no dependency on a Hadoop\ncluster or Hadoop services. The 
local file system is used in place of\nHDFS. This mode allows a quick run of an 
application in a single process\nsandbox, and hence is the most suitable to 
debug and analyze the\napplication logic. This mode is recommended for 
developing the\napplication and can be used for running applications within the 
IDE for\nfunctional testing purposes. Due to limited resources and lack 
\u00a0of\nscalability an application running in this single process mode is 
more\nlikely to encounter throughput bottlenecks. A distributed cluster 
is\nrecommended for benchmarking and production testing.\n\n\nHadoop 
Cluster\n\n\nIn this section we discuss various Hadoop cluster 
setups.\n\n\nSingle Node Cluster\n\n\nIn a single node Hadoop cluster all 
services are deployed on a\nsingle server
  (a developer can use his/her development machine as a\nsingle node cluster). 
The platform does not distinguish between a single\nor multi-node setup and 
behaves exactly the same in both cases.\n\n\nIn this mode, the resource 
manager, name node, data node, and node\nmanager occupy one process each. This 
is an example of running a\nstreaming application as a 
multi-process\u00a0application on the same server.\nWith prevalence of fast, 
multi-core systems, this mode is effective for\ndebugging, fine tuning, and 
generic analysis before submitting the job\nto a larger Hadoop cluster. In this 
mode, execution uses the Hadoop\nservices and hence is likely to identify 
issues that are related to the\nHadoop environment (such issues will not be 
uncovered in local mode).\nThe throughput will obviously not be as high as on a 
multi-node Hadoop\ncluster. Additionally, since each container (i.e. Java 
process) requires\na significant amount of memory, you will be able to run a 
much smaller\nnumber of
  containers than on a multi-node cluster.\n\n\nMulti-Node Cluster\n\n\nIn a 
multi-node Hadoop cluster all the services of Hadoop are\ntypically distributed 
across multiple nodes in a production or\nproduction-level test environment. 
Upon launch the application is\nsubmitted to the Hadoop cluster and executes as 
a  multi-processapplication on\u00a0multiple nodes.\n\n\nBefore you start 
deploying, testing and troubleshooting your\napplication on a cluster, you 
should ensure that Hadoop (version 2.6.0\nor later)\u00a0is properly installed 
and\nyou have basic skills for working with it.\n\n\n\n\nApache Apex Platform 
Overview\n\n\nStreaming Computational Model\n\n\nIn this chapter, we describe 
the the basics of the real-time streaming platform and its computational 
model.\n\n\nThe platform is designed to enable completely asynchronous real 
time computations\u00a0done in as unblocked a way as possible with\nminimal 
overhead .\n\n\nApplications running in the platform are represented by a D
 irected\nAcyclic Graph (DAG) made up of \u00a0operators and streams. All 
computations\nare done in memory on arrival of\nthe input data, with an option 
to save the output to disk (HDFS) in a\nnon-blocking way. The data that flows 
between operators consists of\natomic data elements. Each data element along 
with its type definition\n(henceforth called  schema) is\ncalled a 
tuple.\u00a0An application is a\ndesign of the flow of these tuples to and 
from\nthe appropriate compute units to enable the computation of the 
final\ndesired results.\u00a0A message queue (henceforth called\n\u00a0buffer 
server) manages tuples streaming\nbetween compute units in different 
processes.This server keeps track of\nall consumers, publishers, partitions, 
and enables replay. More\ninformation is given in later section.\n\n\nThe 
streaming application is monitored by a decision making entity\ncalled STRAM 
(streaming application\nmanager).\u00a0STRAM is designed to be a light 
weight\ncontroller that has minim
 al but sufficient interaction with the\napplication. This is done via periodic 
heartbeats. The\nSTRAM does the initial launch and periodically analyzes the 
system\nmetrics to decide if any run time action needs to be taken.\n\n\nA 
fundamental building block for the streaming platform\nis the concept of 
breaking up a stream into equal finite time slices\ncalled streaming windows. 
Each window contains the ordered\nset of tuples in that time slice. A typical 
duration of a window is 500\nms, but can be configured per application (the 
Yahoo! Finance\napplication configures this value in the  
properties.xml\u00a0file to be 1000ms = 1s). Each\nwindow is preceded by a 
begin_window\u00a0event and is terminated by an\nend_window\u00a0event, and is 
assigned\na unique window ID. Even though the platform performs computations 
at\nthe tuple level, bookkeeping is done at the window boundary, making 
the\ncomputations within a window an atomic event in the platform. \u00a0We 
can\nthink of each windo
 w as an  atomic\nmicro-batch\u00a0of tuples, to be processed together as 
one\natomic operation (See Figure 2). \u00a0\n\n\nThis atomic batching allows 
the platform to avoid the very steep\nper tuple bookkeeping cost and instead 
has a manageable per batch\nbookkeeping cost. This translates to higher 
throughput, low recovery\ntime, and higher scalability. Later in this document 
we illustrate how\nthe atomic micro-batch concept allows more efficient 
optimization\nalgorithms.\n\n\nThe platform also has in-built support 
for\napplication windows.\u00a0 An application window is part of 
the\napplication specification, and can be a small or large multiple of 
the\nstreaming window. \u00a0An example from our Yahoo! Finance test 
application\nis the moving average, calculated over a sliding application 
window of 5\nminutes which equates to 300 (= 5 * 60) streaming 
windows.\n\n\nNote that these two window concepts are distinct. \u00a0A 
streaming\nwindow is an abstraction of many tuples into a hig
 her atomic event for\neasier management. \u00a0An application window is a 
group of consecutive\nstreaming windows used for data aggregation (e.g. sum, 
average, maximum,\nminimum) on a per operator level.\n\n\n\n\nAlongside the 
platform,\u00a0a set of\npredefined, benchmarked standard library operator 
templates is provided\nfor ease of use and rapid development of 
application.\u00a0These\noperators are open sourced to Apache Software 
Foundation under the\nproject name \u201cMalhar\u201d as part of our efforts to 
foster community\ninnovation. These operators can be used in a DAG as is, while 
others\nhave properties\u00a0that can be set to specify the\ndesired 
computation. Those interested in details, should refer to\n\nApex-Malhar 
operator library\n.\n\n\nThe platform is a Hadoop YARN native\napplication. It 
runs in a Hadoop cluster just like any\nother YARN application (MapReduce etc.) 
and is designed to seamlessly\nintegrate with rest of Hadoop technology stack. 
It leverages Hadoop 
 as\nmuch as possible and relies on it as its distributed operating 
system.\nHadoop dependencies include resource management, 
compute/memory/network\nallocation, HDFS, security, fault tolerance, 
monitoring, metrics,\nmulti-tenancy, logging etc. Hadoop classes/concepts are 
reused as much\nas possible.  The aim is to enable enterprises\nto leverage 
their existing Hadoop infrastructure for real time streaming\napplications. The 
platform is designed to scale with big\ndata applications and scale with 
Hadoop.\n\n\nA streaming application is an asynchronous execution 
of\ncomputations across distributed nodes. All computations are done 
in\nparallel on a distributed cluster. The computation model is designed to\ndo 
as many parallel computations as possible in a non-blocking fashion.\nThe task 
of monitoring of the entire application is done on (streaming)\nwindow 
boundaries with a streaming window as an atomic entity. A window\ncompletion is 
a quantum of work done. There is no assumption that
  an\noperator can be interrupted at precisely a particular tuple or 
window.\n\n\nAn operator itself also\ncannot assume or predict the exact time a 
tuple that it emitted would\nget consumed by downstream operators. The operator 
processes the tuples\nit gets and simply emits new tuples based on its business 
logic. The\nonly guarantee it has is that the upstream operators are 
processing\neither the current or some later window, and the downstream 
operator is\nprocessing either the current or some earlier window. The 
completion of\na window (i.e. propagation of the  end_window\u00a0event through 
an operator) in any\noperator guarantees that all upstream operators have 
finished processing\nthis window. Thus, the end_window\u00a0event is blocking 
on an operator\nwith multiple outputs, and is a synchronization point in the 
DAG. The\n begin_window\u00a0event does not have\nany such restriction, a 
single begin_window\u00a0event from any upstream operator\ntriggers the 
operator to start proc
 essing tuples.\n\n\nStreaming Application Manager (STRAM)\n\n\nStreaming 
Application Manager (STRAM) is the Hadoop YARN native\napplication master. 
STRAM is the first process that is activated upon\napplication launch and 
orchestrates the streaming application on the\nplatform. STRAM is a lightweight 
controller process. The\nresponsibilities of STRAM include\n\n\n\n\n\n\nRunning 
the Application\n\n\n\n\nRead the\u00a0logical plan\u00a0of the application 
(DAG) submitted by the client\n\n\nValidate the logical plan\n\n\nTranslate the 
logical plan into a physical plan, where certain operators may  be partitioned 
(i.e. replicated) to multiple operators for  handling load.\n\n\nRequest 
resources (Hadoop containers) from Resource Manager,\n    per physical 
plan\n\n\nBased on acquired resources and application attributes, create\n    
an execution plan\u00a0by partitioning the DAG into fragments,\n    each 
assigned to different containers.\n\n\nExecutes the application by deploying 
each fra
 gment to\n    its container. Containers then start stream processing and run\n 
   autonomously, processing one streaming window after another. Each\n    
container is represented as an instance of the  StreamingContainer\u00a0class, 
which updates\n    STRAM via the heartbeat protocol and processes directions 
received\n    from STRAM.\n\n\n\n\n\n\n\n\nContinually monitoring the 
application via heartbeats from each StreamingContainer\n\n\n\n\nCollecting 
Application System Statistics and Logs\n\n\nLogging all application-wide 
decisions taken\n\n\nProviding system data on the state of the application via 
a  Web Service.\n\n\n\n\nSupporting Fault Tolerance\n\n\na.  Detecting a node 
outage\nb.  Requesting a replacement resource from the Resource Manager\n    
and scheduling state restoration for the streaming operators\nc.  Saving state 
to Zookeeper\n\n\n\n\n\n\nSupporting Dynamic Partitioning:\u00a0Periodically 
evaluating the SLA and modifying the physical plan if required\n    (logical pl
 an does not change).\n\n\n\n\nEnabling Security:\u00a0Distributing security 
tokens for distributed components of the execution engine\n    and securing web 
service requests.\n\n\nEnabling Dynamic modification of DAG: In the future, we 
intend to allow for user initiated\n    modification of the logical plan to 
allow for changes to the\n    processing logic and functionality.\n\n\n\n\nAn 
example of the Yahoo! Finance Quote application scheduled on a\ncluster of 5 
Hadoop containers (processes) is shown in Figure 3.\n\n\n\n\nAn example for the 
translation from a logical plan to a physical\nplan and an execution plan for a 
subset of the application is shown in\nFigure 4.\n\n\n\n\nHadoop 
Components\n\n\nIn this section we cover some aspects of Hadoop that 
your\nstreaming application interacts with. This section is not meant 
to\neducate the reader on Hadoop, but just get the reader acquainted with\nthe 
terms. We strongly advise readers to learn Hadoop from other\nsources.\n\n\nA 
streaming 
 application runs as a native Hadoop 2.x application.\nHadoop 2.x does not 
differentiate between a map-reduce job and other\napplications, and hence as 
far as Hadoop is concerned, the streaming\napplication is just another job. 
This means that your application\nleverages all the bells and whistles Hadoop 
provides and is fully\nsupported within Hadoop technology stack. The platform 
is responsible\nfor properly integrating itself with the relevant components of 
Hadoop\nthat exist today and those that may emerge in the future\n\n\nAll 
investments that leverage multi-tenancy (for example quotas\nand queues), 
security (for example kerberos), data flow integration (for\nexample copying 
data in-out of HDFS), monitoring, metrics collections,\netc. will require no 
changes when streaming applications run 
on\nHadoop.\n\n\nYARN\n\n\nYARN\nis\nthe core library of Hadoop 2.x that is 
tasked with resource management\nand works as a distributed application 
framework. In this section we\nwill walk thr
 ough Yarn's components. In Hadoop 2.x, the old jobTracker\nhas been replaced 
by a combination of ResourceManager (RM) and\nApplicationMaster 
(AM).\n\n\nResource Manager (RM)\n\n\nResourceManager\n(RM)\nmanages all the 
distributed resources. It allocates and arbitrates all\nthe slots and the 
resources (cpu, memory, network) of these slots. It\nworks with per-node 
NodeManagers (NMs) and per-application\nApplicationMasters (AMs). Currently 
memory usage is monitored by RM; in\nupcoming releases it will have CPU as well 
as network management. RM is\nshared by map-reduce and streaming applications. 
Running streaming\napplications requires no changes in the RM.\n\n\nApplication 
Master (AM)\n\n\nThe AM is the watchdog or monitoring process for your 
application\nand has the responsibility of negotiating resources with RM 
and\ninteracting with NodeManagers to get the allocated containers 
started.\nThe AM is the starting point of your application and is considered 
user\ncode (not system Hadoop
  code). The AM itself runs in one container. All\nresource management within 
the application are managed by the AM. This\nis a critical feature for Hadoop 
2.x where tasks done by jobTracker in\nHadoop 1.0 have been distributed 
allowing Hadoop 2.x to scale much\nbeyond Hadoop 1.0. STRAM is a native YARN 
ApplicationManager.\n\n\nNode Managers (NM)\n\n\nThere is one 
\nNodeManager\n(NM)\nper node in the cluster. All the containers (i.e. 
processes) on that\nnode are monitored by the NM. It takes instructions from RM 
and manages\nresources of that node as per RM instructions. NMs interactions 
are same\nfor map-reduce and for streaming applications. Running 
streaming\napplications requires no changes in the NM.\n\n\nRPC 
Protocol\n\n\nCommunication among RM, AM, and NM is done via the Hadoop 
RPC\nprotocol. Streaming applications use the same protocol to send 
their\ndata. No changes are needed in RPC support provided by Hadoop to 
enable\ncommunication done by components of your application.\
 n\n\nHDFS\n\n\nHadoop includes a highly fault tolerant, high 
throughput\ndistributed file system (\nHDFS\n).\nIt runs on commodity hardware, 
and your streaming application will, by\ndefault, use it. There is no 
difference between files created by a\nstreaming application and those created 
by map-reduce.\n\n\nDeveloping An Application\n\n\nIn this chapter we describe 
the methodology to develop an\napplication using the Realtime Streaming 
Platform. The platform was\ndesigned to make it easy to build and launch 
sophisticated streaming\napplications with the developer having to deal only 
with the\napplication/business logic. The platform deals with details of where 
to\nrun what operators on which servers and how to correctly route streams\nof 
data among them.\n\n\nDevelopment Process\n\n\nWhile the platform does not 
mandate a specific methodology or set\nof development tools, we have 
recommendations to maximize productivity\nfor the different phases of 
application development.\n\n\nDesi
 gn\n\n\n\n\nIdentify common, reusable operators. Use a library\n    if 
possible.\n\n\nIdentify scalability and performance requirements before\n    
designing the DAG.\n\n\nLeverage attributes that the platform supports for 
scalability\n    and performance.\n\n\nUse operators that are benchmarked and 
tested so that later\n    surprises are minimized. If you have glue code, 
create appropriate\n    unit tests for it.\n\n\nUse THREAD_LOCAL locality for 
high throughput streams. If all\n    the operators on that stream cannot fit in 
one container,\n    try\u00a0NODE_LOCAL\u00a0locality. Both THREAD_LOCAL and\n  
  NODE_LOCAL streams avoid the Network Interface Card (NIC)\n    completly. The 
former uses intra-process communication to also avoid\n    
serialization-deserialization overhead.\n\n\nThe overall throughput and 
latencies are are not necessarily\n    correlated to the number of operators in 
a simple way -- the\n    relationship is more nuanced. A lot depends on how 
much work\n    in
 dividual operators are doing, how many are able to operate in\n    parallel, 
and how much data is flowing through the arcs of the DAG.\n    It is, at times, 
better to break a computation down into its\n    constituent simple parts and 
then stitch them together via streams\n    to better utilize the compute 
resources of the cluster. Decide on a\n    per application basis the fine line 
between complexity of each\n    operator vs too many streams. Doing multiple 
computations in one\n    operator does save network I/O, while operators that 
are too complex\n    are hard to maintain.\n\n\nDo not use operators that 
depend on the order of two streams\n    as far as possible. In such cases 
behavior is not idempotent.\n\n\nPersist key information to HDFS if possible; 
it may be useful\n    for debugging later.\n\n\nDecide on an appropriate fault 
tolerance mechanism. If some\n    data loss is acceptable, use the at-most-once 
mechanism as it has\n    fastest recovery.\n\n\n\n\nCreating New Proje
 ct\n\n\nPlease refer to the \nApex Application Packages\n\u00a0for\nthe basic 
steps for creating a new project.\n\n\nWriting the application 
code\n\n\nPreferably use an IDE (Eclipse, Netbeans etc.) that allows you 
to\nmanage dependencies and assists with the Java coding. Specific 
benefits\ninclude ease of managing operator library jar files, individual 
operator\nclasses, ports and properties. It will also highlight and assist 
to\nrectify issues such as type mismatches when adding streams 
while\ntyping.\n\n\nTesting\n\n\nWrite test cases with JUnit or similar test 
framework so that code\nis tested as it is written. For such testing, the DAG 
can run in local\nmode within the IDE. Doing this may involve writing mock 
input or output\noperators for the integration points with external systems. 
For example,\ninstead of reading from a live data stream, the application in 
test mode\ncan read from and write to files. This can be done with a 
single\napplication DAG by instrumenting a test mod
 e using settings in the\nconfiguration that is passed to the application 
factory\ninterface.\n\n\nGood test coverage will not only eliminate basic 
validation errors\nsuch as missing port connections or property constraint 
violations, but\nalso validate the correct processing of the data. The same 
tests can be\nre-run whenever the application or its dependencies change 
(operator\nlibraries, version of the platform etc.)\n\n\nRunning an 
application\n\n\nThe platform provides a commandline tool called \napex\n for 
managing applications (launching,\nkilling, viewing, etc.). This tool was 
already discussed above briefly\nin the section entitled Running the Test 
Application. It will introspect\nthe jar file specified with the launch command 
for applications (classes\nthat implement ApplicationFactory) or properties 
files that define\napplications. It will also deploy the dependency jar files 
from the\napplication package to the cluster.\n\n\nApex CLI can run the 
application in local mode 
 (i.e. outside a\ncluster). It is recommended to first run the application in 
local mode\nin the development environment before launching on the Hadoop 
cluster.\nThis way some of the external system integration and 
correct\nfunctionality of the application can be verified in an easier to 
debug\nenvironment before testing distributed mode.\n\n\nFor more details on 
CLI please refer to the \nApex CLI Guide\n.\n\n\nApplication API\n\n\nThis 
section introduces the API to write a streaming application.\nThe work involves 
connecting operators via streams to form the logical\nDAG. The steps 
are\n\n\n\n\n\n\nInstantiate an application (DAG)\n\n\n\n\n\n\n(Optional) Set 
Attributes\n\n\n\n\nAssign application name\n\n\nSet any other attributes as 
per application requirements\n\n\n\n\n\n\n\n\nCreate/re-use and instantiate 
operators\n\n\n\n\nAssign operator name that is unique within the  
application\n\n\nDeclare schema upfront for each operator (and thereby its 
ports)\n\n\n(Optional) Set properti
 es\u00a0 and attributes on the dag as per specification\n\n\nConnect ports of 
operators via streams\n\n\nEach stream connects one output port of an operator 
to one or  more input ports of other operators.\n\n\n(Optional) Set attributes 
on the streams\n\n\n\n\n\n\n\n\n\n\n\n\nTest the 
application.\n\n\n\n\n\n\nThere are two methods to create an application, 
namely Java, and\nProperties file. Java API is for applications being developed 
by humans,\nand properties file (Hadoop like) is more suited for DAGs generated 
by\ntools.\n\n\nJava API\n\n\nThe Java API is the most common way to create a 
streaming\napplication. It is meant for application developers who prefer 
to\nleverage the features of Java, and the ease of use and 
enhanced\nproductivity provided by IDEs like NetBeans or Eclipse. Using Java 
to\nspecify the application provides extra validation abilities of 
Java\ncompiler, such as compile time checks for type safety at the time 
of\nwriting the code. Later in this chapter you can
  read more about\nvalidation support in the platform.\n\n\nThe developer 
specifies the streaming application by implementing\nthe ApplicationFactory 
interface, which is how platform tools (CLI etc.)\nrecognize and instantiate 
applications. Here we show how to create a\nYahoo! Finance application that 
streams the last trade price of a ticker\nand computes the high and low price 
in every 1 min window. Run above\n test application\u00a0to execute the\nDAG in 
local mode within the IDE.\n\n\nLet us revisit how the Yahoo! Finance test 
application constructs the DAG:\n\n\npublic class Application implements 
StreamingApplication\n{\n\n  ...\n\n  @Override\n  public void populateDAG(DAG 
dag, Configuration conf)\n  {\n    
dag.getAttributes().attr(DAG.STRAM_WINDOW_SIZE_MILLIS).set(streamingWindowSizeMilliSeconds);\n\n
    StockTickInput tick = getStockTickInputOperator(\nStockTickInput\n, dag);\n 
   SumKeyVal\nString, Long\n dailyVolume = 
getDailyVolumeOperator(\nDailyVolume\n, dag);\n    Conso
 lidatorKeyVal\nString,Double,Long,String,?,?\n quoteOperator = 
getQuoteOperator(\nQuote\n, dag);\n\n    RangeKeyVal\nString, Double\n highlow 
= getHighLowOperator(\nHighLow\n, dag, appWindowCountMinute);\n    
SumKeyVal\nString, Long\n minuteVolume = 
getMinuteVolumeOperator(\nMinuteVolume\n, dag, appWindowCountMinute);\n    
ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n chartOperator = 
getChartOperator(\nChart\n, dag);\n\n    SimpleMovingAverage\nString, Double\n 
priceSMA = getPriceSimpleMovingAverageOperator(\nPriceSMA\n, dag, 
appWindowCountSMA);\n\n    dag.addStream(\nprice\n, tick.price, 
quoteOperator.in1, highlow.data, priceSMA.data);\n    dag.addStream(\nvol\n, 
tick.volume, dailyVolume.data, minuteVolume.data);\n    dag.addStream(\ntime\n, 
tick.time, quoteOperator.in3);\n    dag.addStream(\ndaily_vol\n, 
dailyVolume.sum, quoteOperator.in2);\n\n    dag.addStream(\nquote_data\n, 
quoteOperator.out, getConsole(\nquoteConsole\n, dag, \nQUOTE\n));\n\n    
dag.addStream(\nhigh_low\n, hi
 ghlow.range, chartOperator.in1);\n    dag.addStream(\nvol_1min\n, 
minuteVolume.sum, chartOperator.in2);\n    dag.addStream(\nchart_data\n, 
chartOperator.out, getConsole(\nchartConsole\n, dag, \nCHART\n));\n\n    
dag.addStream(\nsma_price\n, priceSMA.doubleSMA, 
getConsole(\npriceSMAConsole\n, dag, \nPrice SMA\n));\n\n    return dag;\n  
}\n}\n\n\n\n\nJSON File DAG Specification\n\n\nIn addition to Java, you can 
also specify the DAG using JSON, provided the operators in the DAG are present 
in the dependency jars. Create src/main/resources/app directory under your app 
package project, and put your JSON files there. This is the specification of a 
JSON file that specifies an application.\n\n\nCreate a json file under 
src/main/resources/app, For example \nmyApplication.json\n\n\n{\n  
\ndescription\n: \n{application description}\n,\n  \noperators\n: [\n    {\n    
  \nname\n: \n{operator name}\n,\n      \nclass\n: \n{fully qualified class 
name of the operator}\n,\n      \nproperties\n: {\n  
       \n{property key}\n: \n{property value}\n,\n        ...\n      }\n    }, 
...\n  ],\n  \nstreams\n: [\n    {\n      \nname\n: \n{stream name}\n,\n      
\nsource\n: {\n        \noperatorName\n: \n{source operator name}\n,\n        
\nportName\n: \n{source operator output port name}\n\n      }\n      \nsinks\n: 
[\n        {\n          \noperatorName\n: \n{sink operator name}\n,\n          
\nportName\n: \n{sink operator input port name}\n\n        }, ...\n      ]\n    
}, ...\n  ]\n}\n\n\n\n\n\n\n\nThe name of the JSON file is taken as the name of 
the application.\n\n\nThe \ndescription\n field is the description of the 
application and is optional.\n\n\nThe \noperators\n field is the list of 
operators the application has. You can specifiy the name, the Java class, and 
the properties of each operator here.\n\n\nThe \nstreams\n field is the list of 
streams that connects the operators together to form the DAG. Each stream 
consists of the stream name, the operator and port that it connec
 ts from, and the list of operators and ports that it connects to. Note that 
you can connect from \none\n output port of an operator to \nmultiple\n 
different input ports of different operators.\n\n\n\n\nIn Apex Malhar, there is 
an \nexample\n in the Pi Demo doing just that.\n\n\nProperties File DAG 
Specification\n\n\nThe platform also supports specification of a DAG via a 
properties\nfile. The aim here to make it easy for tools to create and run 
an\napplication. This method of specification does not have the Java\ncompiler 
support of compile time check, but since these applications\nwould be created 
by software, they should be correct by construction.\nThe syntax is derived 
from Hadoop properties and should be easy for\nfolks who are used to creating 
software that integrated with\nHadoop.\n\n\nUnder the src/main/resources/app 
directory (create if it doesn't exist), create a properties file.\nFor example 
\nmyApplication.properties\n\n\n# input operator that reads from a file\napex.op
 
erator.inputOp.classname=com.acme.SampleInputOperator\napex.operator.inputOp.fileName=somefile.txt\n\n#
 output operator that writes to the 
console\napex.operator.outputOp.classname=com.acme.ConsoleOutputOperator\n\n# 
stream connecting both 
operators\napex.stream.inputStream.source=inputOp.outputPort\napex.stream.inputStream.sinks=outputOp.inputPort\n\n\n\n\nAbove
 snippet is intended to convey the basic idea of specifying\nthe DAG using 
properties file. Operators would come from a predefined\nlibrary and referenced 
in the specification by class name and port names\n(obtained from the library 
providers documentation or runtime\nintrospection by tools). For those 
interested in details, see later\nsections and refer to the  Operation 
and\nInstallation Guide\u00a0mentioned above.\n\n\nAttributes\n\n\nAttributes 
impact the runtime behavior of the application. They do\nnot impact the 
functionality. An example of an attribute is application\nname. Setting it 
changes the application name. An
 other example is\nstreaming window size. Setting it changes the streaming 
window size from\nthe default value to the specified value. Users cannot add 
new\nattributes, they can only choose from the ones that come packaged 
and\npre-supported by the platform. Details of attributes are covered in the\n 
Operation and Installation\nGuide.\n\n\nOperators\n\n\nOperators\u00a0are basic 
compute units.\nOperators process each incoming tuple and emit zero or more 
tuples on\noutput ports as per the business logic. The data flow, 
connectivity,\nfault tolerance (node outage), etc. is taken care of by the 
platform. As\nan operator developer, all that is needed is to figure out what 
to do\nwith the incoming tuple and when (and which output port) to send out 
a\nparticular output tuple. Correctly designed operators will most likely\nget 
reused. Operator design needs care and foresight. For details, refer\nto the  
\nOperator Developer Guide\n. As an application developer you need to connect 
operators\
 nin a way that it implements your business logic. You may also 
require\noperator customization for functionality and use attributes 
for\nperformance/scalability etc.\n\n\nAll operators process tuples 
asynchronously in a distributed\ncluster. An operator cannot assume or predict 
the exact time a tuple\nthat it emitted will get consumed by a downstream 
operator. An operator\nalso cannot predict the exact time when a tuple arrives 
from an upstream\noperator. The only guarantee is that the upstream operators 
are\nprocessing the current or a future window, i.e. the windowId of 
upstream\noperator is equals or exceeds its own windowId. Conversely the 
windowId\nof a downstream operator is less than or equals its own windowId. 
The\nend of a window operation, i.e. the API call to endWindow on an 
operator\nrequires that all upstream operators have finished processing 
this\nwindow. This means that completion of processing a window propagates 
in\na blocking fashion through an operator. Later sec
 tions provides more\ndetails on streams and data flow of tuples.\n\n\nEach 
operator has a unique name within the DAG as provided by the\nuser. This is the 
name of the operator in the logical plan. The name of\nthe operator in the 
physical plan is an integer assigned to it by STRAM.\nThese integers are use 
the sequence from 1 to N, where N is total number\nof physically unique 
operators in the DAG. \u00a0Following the same rule,\neach partitioned instance 
of a logical operator has its own integer as\nan id. This id along with the 
Hadoop container name uniquely identifies\nthe operator in the execution plan 
of the DAG. The logical names and the\nphysical names are required for web 
service support. Operators can be\naccessed via both names. These same names 
are used while interacting\nwith apex cli\u00a0to access an operator.\nIdeally 
these names should be self-descriptive. For example in Figure 1,\nthe node 
named \u201cDaily volume\u201d has a physical identifier of 2.\n\n\nOperator I
 nterface\n\n\nOperator interface in a DAG consists of 
ports,\u00a0properties,\u00a0and attributes.\nOperators interact with other 
components of the DAG via ports. Functional behavior of the operators\ncan be 
customized via parameters. Run time performance and physical\ninstantiation is 
controlled by attributes. Ports and parameters are\nfields (variables) of the 
Operator class/object, while attributes are\nmeta information that is attached 
to the operator object via an\nAttributeMap. An operator must have at least one 
port. Properties are\noptional. Attributes are provided by the platform and 
always have a\ndefault value that enables normal functioning of 
operators.\n\n\nPorts\n\n\nPorts are connection points by which an operator 
receives and\nemits tuples. These should be transient objects instantiated in 
the\noperator object, that implement particular interfaces. Ports should 
be\ntransient as they contain no state. They have a pre-defined schema and\ncan 
only be connected to other
  ports with the same schema. An input port\nneeds to implement the interface  
Operator.InputPort\u00a0and\ninterface Sink. A default\nimplementation of these 
is provided by the abstract class DefaultInputPort. An output port needs 
to\nimplement the interface  Operator.OutputPort. A default implementation\nof 
this is provided by the concrete class DefaultOutputPort. These two are a quick 
way to\nimplement the above interfaces, but operator developers have the 
option\nof providing their own implementations.\n\n\nHere are examples of an 
input and an output port from the 
operator\nSum.\n\n\n@InputPortFieldAnnotation(name = \ndata\n)\npublic final 
transient DefaultInputPort\nV\n data = new DefaultInputPort\nV\n() {\n  
@Override\n  public void process(V tuple)\n  {\n    ...\n  
}\n}\n@OutputPortFieldAnnotation(optional=true)\npublic final transient 
DefaultOutputPort\nV\n sum = new DefaultOutputPort\nV\n(){ \u2026 
};\n\n\n\n\nThe process call is in the Sink interface. An emit on an output\n
 port is done via emit(tuple) call. For the above example it would 
be\nsum.emit(t), where the type of t is the generic parameter V.\n\n\nThere is 
no limit on how many ports an operator can have. However\nany operator must 
have at least one port. An operator with only one port\nis called an Input 
Adapter if it has no input port and an Output Adapter\nif it has no output 
port. These are special operators needed to get/read\ndata from outside 
system/source into the application, or push/write data\ninto an outside 
system/sink. These could be in Hadoop or outside of\nHadoop. These two 
operators are in essence gateways for the streaming\napplication to communicate 
with systems outside the application.\n\n\nPort connectivity can be validated 
during compile time by adding\nPortFieldAnnotations shown above. By default all 
ports have to be\nconnected, to allow a port to go unconnected, you need to 
add\n\u201coptional=true\u201d to the annotation.\n\n\nAttributes can be 
specified for ports that
  affect the runtime\nbehavior. An example of an attribute is parallel 
partition that specifes\na parallel computation flow per partition. It is 
described in detail in\nthe Parallel Partitions section. Another example is 
queue capacity that specifies the buffer size for the\nport. Details of 
attributes are covered in  Operation and Installation 
Guide.\n\n\nProperties\n\n\nProperties are the abstractions by which functional 
behavior of an\noperator can be customized. They should be non-transient 
objects\ninstantiated in the operator object. They need to be non-transient 
since\nthey are part of the operator state and re-construction of the 
operator\nobject from its checkpointed state must restore the operator to 
the\ndesired state. Properties are optional, i.e. an operator may or may 
not\nhave properties; they are part of user code and their values are 
not\ninterpreted by the platform in any way.\n\n\nAll non-serializable objects 
should be declared transient.\nExamples include sockets,
  session information, etc. These objects should\nbe initialized during setup 
call, which is called every time the\noperator is 
initialized.\n\n\nAttributes\n\n\nAttributes are values assigned to the 
operators that impact\nrun-time. This includes things like the number of 
partitions, at most\nonce or at least once or exactly once recovery modes, etc. 
Attributes do\nnot impact functionality of the operator. Users can change 
certain\nattributes in runtime. Users cannot add attributes to operators; 
they\nare pre-defined by the platform. They are interpreted by the 
platform\nand thus cannot be defined in user created code (like 
properties).\nDetails of attributes are covered in  \nConfiguration 
Guide\n.\n\n\nOperator State\n\n\nThe state of an operator is defined as the 
data that it transfers\nfrom one window to a future window. Since the computing 
model of the\nplatform is to treat windows like micro-batches, the operator 
state can\nbe checkpointed every Nth window, or every T units of 
 time, where T is significantly greater\nthan the streaming window. \u00a0When 
an operator is checkpointed, the entire\nobject is written to HDFS. \u00a0The 
larger the amount of state in an\noperator, the longer it takes to recover from 
a failure. A stateless\noperator can recover much quicker than a stateful one. 
The needed\nwindows are preserved by the upstream buffer server and are used 
to\nrecompute the lost windows, and also rebuild the buffer server in 
the\ncurrent container.\n\n\nThe distinction between Stateless and Stateful is 
based solely on\nthe need to transfer data in the operator from one window to 
the next.\nThe state of an operator is independent of the number of 
ports.\n\n\nStateless\n\n\nA Stateless operator is defined as one where no data 
is needed to\nbe kept at the end of every window. This means that all the 
computations\nof a window can be derived from all the tuples the operator 
receives\nwithin that window. This guarantees that the output of any window can 
be
 \nreconstructed by simply replaying the tuples that arrived in that\nwindow. 
Stateless operators are more efficient in terms of fault\ntolerance, and cost 
to achieve SLA.\n\n\nStateful\n\n\nA Stateful operator is defined as one where 
data is needed to be\nstored at the end of a window for computations occurring 
in later\nwindow; a common example is the computation of a sum of values in 
the\ninput tuples.\n\n\nOperator API\n\n\nThe Operator API consists of methods 
that operator developers may\nneed to override. In this section we will discuss 
the Operator APIs from\nthe point of view of an application developer. 
Knowledge of how an\noperator works internally is critical for writing an 
application. Those\ninterested in the details should refer to  Malhar Operator 
Developer Guide.\n\n\nThe APIs are available in three modes, namely Single 
Streaming\nWindow, Sliding Application Window, and Aggregate Application 
Window.\nThese are not mutually exclusive, i.e. an operator can use single\ns
 treaming window as well as sliding application window. A physical\ninstance of 
an operator is always processing tuples from a single\nwindow. The processing 
of tuples is guaranteed to be sequential, no\nmatter which input port the 
tuples arrive on.\n\n\nIn the later part of this section we will evaluate three 
common\nuses of streaming windows by applications. They have 
different\ncharacteristics and implications on optimization and recovery 
mechanisms\n(i.e. algorithm used to recover a node after outage) as discussed 
later\nin the section.\n\n\nStreaming Window\n\n\nStreaming window is atomic 
micro-batch computation period. The API\nmethods relating to a streaming window 
are as follows\n\n\npublic void process(\ntuple_type\n tuple) // Called on the 
input port on which the tuple arrives\npublic void beginWindow(long windowId) 
// Called at the start of the window as soon as the first begin_window tuple 
arrives\npublic void endWindow() // Called at the end of the window after 
end_windo
 w tuples arrive on all input ports\npublic void setup(OperatorContext context) 
// Called once during initialization of the operator\npublic void teardown() // 
Called once when the operator is being shutdown\n\n\n\n\nA tuple can be emitted 
in any of the three streaming run-time\ncalls, namely beginWindow, process, and 
endWindow but not in setup or\nteardown.\n\n\nAggregate Application 
Window\n\n\nAn operator with an aggregate window is stateful within 
the\napplication window timeframe and possibly stateless at the end of 
that\napplication window. An size of an aggregate application window is 
an\noperator attribute and is defined as a multiple of the streaming 
window\nsize. The platform recognizes this attribute and optimizes the 
operator.\nThe beginWindow, and endWindow calls are not invoked for those 
streaming\nwindows that do not align with the application window. For example 
in\ncase of streaming window of 0.5 second and application window of 5\nminute, 
an application window spans
  600 streaming windows (5*60*2 =\n600). At the start of the sequence of these 
600 atomic streaming\nwindows, a beginWindow gets invoked, and at the end of 
these 600\nstreaming windows an endWindow gets invoked. All the 
intermediate\nstreaming windows do not invoke beginWindow or endWindow. 
Bookkeeping,\nnode recovery, stats, UI, etc. continue to work off streaming 
windows.\nFor example if operators are being checkpointed say on an average 
every\n30th window, then the above application window would have about 
20\ncheckpoints.\n\n\nSliding Application Window\n\n\nA sliding window is 
computations that requires previous N\nstreaming windows. After each streaming 
window the Nth past window is\ndropped and the new window is added to the 
computation. An operator with\nsliding window is a stateful operator at end of 
any window. The sliding\nwindow period is an attribute and is a multiple of 
streaming window. The\nplatform recognizes this attribute and leverages it 
during bookkeeping.\nA sli
 ding aggregate window with tolerance to data loss does not have a\nvery high 
bookkeeping cost. The cost of all three recovery mechanisms,\n at most 
once\u00a0(data loss tolerant),\nat least once\u00a0(data loss\nintolerant), 
and exactly once\u00a0(data\nloss intolerant and no extra computations) is same 
as recovery\nmechanisms based on streaming window. STRAM is not able to 
leverage this\noperator for any extra optimization.\n\n\nSingle vs Multi-Input 
Operator\n\n\nA single-input operator by definition has a single 
upstream\noperator, since there can only be one writing port for a stream. 
\u00a0If an\noperator has a single upstream operator, then the beginWindow on 
the\nupstream also blocks the beginWindow of the single-input operator. For\nan 
operator to start processing any window at least one upstream\noperator has to 
start processing that window. A multi-input operator\nreads from more than one 
upstream ports. Such an operator would start\nprocessing as soon as the first 
begin_w
 indow event arrives. However the\nwindow would not close (i.e. invoke 
endWindow) till all ports receive\nend_window events for that windowId. Thus 
the end of a window is a\nblocking event. As we saw earlier, a multi-input 
operator is also the\npoint in the DAG where windows of all upstream operators 
are\nsynchronized. The windows (atomic micro-batches) from a faster (or 
just\nahead in processing) upstream operators are queued up till the 
slower\nupstream operator catches up. STRAM monitors such bottlenecks and 
takes\ncorrective actions. The platform ensures minimal delay, i.e 
processing\nstarts as long as at least one upstream operator has 
started\nprocessing.\n\n\nRecovery Mechanisms\n\n\nApplication developers can 
set any of the recovery mechanisms\nbelow to deal with node outage. In general, 
the cost of recovery depends\non the state of the operator, while data 
integrity is dependant on the\napplication. The mechanisms are per window as 
the platform treats\nwindows as atomic comp
 ute units. Three recovery mechanisms are\nsupported, 
namely\n\n\n\n\nAt-least-once: All atomic batches are processed at least 
once.\n    No data loss occurs.\n\n\nAt-most-once: All atomic batches are 
processed at most once.\n    Data loss is possible; this is the most efficient 
setting.\n\n\nExactly-once: All atomic batches are processed exactly once.\n    
No data loss occurs; this is the least efficient setting since\n    additional 
work is needed to ens

<TRUNCATED>

Reply via email to