http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/2cec5265/application_development.md ---------------------------------------------------------------------- diff --git a/application_development.md b/application_development.md new file mode 100644 index 0000000..ba6670b --- /dev/null +++ b/application_development.md @@ -0,0 +1,3047 @@ +Application Developer Guide +=========================== + +Real-time big data processing is not only important but has become +critical for businesses which depend on accurate and timely analysis of +their business data. A few businesses have yielded to very expensive +solutions like building an in-house, real-time analytics infrastructure +supported by an internal development team, or buying expensive +proprietary software. A large number of businesses are dealing with the +requirement just by trying to make Hadoop do their batch jobs in smaller +iterations. Over the last few years, Hadoop has become ubiquitous in the +big data processing space, replacing expensive proprietary hardware and +software solutions for massive data processing with very cost-effective, +fault-tolerant, open-sourced, and commodity-hardware-based solutions. +While Hadoop has been a game changer for companies, it is primarily a +batch-oriented system, and does not yet have a viable option for +real-time data processing.  Most companies with real-time data +processing end up having to build customized solutions in addition to +their Hadoop infrastructure. + + + +The DataTorrent platform is designed to process massive amounts of +real-time events natively in Hadoop. This can be event ingestion, +processing, and aggregation for real-time data analytics, or can be +real-time business logic decisioning such as cell tower load balancing, +real-time ads bidding, or fraud detection.  The platform has the ability +to repair itself in real-time (without data loss) if hardware fails, and +adapt to changes in load by adding and removing computing resources +automatically. + + + +DataTorrent is a native Hadoop application. It runs as a YARN +(Hadoop 2.x) application and leverages Hadoop as a distributed operating +system. All the basic distributed operating system capabilities of +Hadoop like resource allocation ( [Resource Manager](#h.1ksv4uv) [)](#h.1ksv4uv), +distributed file system ([HDFS](#h.3j2qqm3)[)](#h.3j2qqm3), [multi-tenancy](#h.3q5sasy)[,](#h.3q5sasy) +[security](#h.3q5sasy) [,](#h.3q5sasy) [fault-tolerance](#h.2nusc19)[,](#h.2nusc19) [scalability](#h.34g0dwd)[,](#h.34g0dwd) etc. +are supported natively in all streaming applications.  Just as Hadoop +for map-reduce handles all the details of the application allowing you +to only focus on writing the application (the mapper and reducer +functions), the platform handles all the details of streaming execution, +allowing you to only focus on your business logic. Using the platform +removes the need to maintain separate clusters for real-time +applications. + + + +In the platform, building a streaming application can be extremely +easy and intuitive.  The application is represented as a Directed +Acyclic Graph (DAG) of computation units called [operators](#h.3o7alnk)[ ](#h.3o7alnk)interconnected +by the data-flow edges called [streams](#h.nmf14n) +[.](#h.nmf14n) The operators process input +streams and produce output streams. A library of common operators is +provided to enable quick application development.  In case the desired +processing is not available in the Operator Library, one can easily +write a custom operator. We refer those interested in creating their own +operators to the [Operator Development Guide](operator_development.md). + +Running A Test Application +======================================= + +This chapter will help you with a quick start on running an +application. If you are starting with the platform for the first time, +it would be informative to open an existing application and see it run. +Do the following steps to run the PI demo, which computes the value of +PI  in a simple +manner: + +1. Open up platform files in your IDE (for example NetBeans, or Eclipse) +2. Open Demos project +3. Open Test Packages and run ApplicationTest.java in pi package +4. See the results in your system console + + + +Congratulations, you just ran your first real-time streaming demo +:) This demo is very simple and has four operators. The first operator +emits random integers between 0 to 30, 000. The second operator receives +these coefficients and emits a hashmap with x and y values each time it +receives two values. The third operator takes these values and computes +x\*\*2+y\*\*2. The last operator counts how many computed values from +the previous operator were less than or equal to 30, 000\*\*2. Assuming +this count is N, then PI is computed as N/number of values received. +Here is the code snippet for the PI application. This code populates the +DAG. Do not worry about what each line does, we will cover these +concepts later in this document. + + +```java +// Generates random numbers +RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); +rand.setMinvalue(0); +rand.setMaxvalue(30000); + +// Generates a round robin HashMap of "x" and "y" +RoundRobinHashMap<String,Object> rrhm = dag.addOperator("rrhm", new RoundRobinHashMap<String, Object>()); +rrhm.setKeys(new String[] { "x", "y" }); + +// Calculates pi from x and y +JavaScriptOperator calc = dag.addOperator("picalc", new Script()); +calc.setPassThru(false); +calc.put("i",0); +calc.put("count",0); +calc.addSetupScript("function pi() { if (x*x+y*y <= "+maxValue*maxValue+") { i++; } count++; return i / count * 4; }"); +calc.setInvoke("pi"); +dag.addStream("rand_rrhm", rand.integer_data, rrhm.data); +dag.addStream("rrhm_calc", rrhm.map, calc.inBindings); + +// puts results on system console +ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); +dag.addStream("rand_console",calc.result, console.input); +``` + + +You can review the other demos and see what they do. The examples +given in the Demos project cover various features of the platform and we +strongly encourage you to read these to familiarize yourself with the +platform. In the remaining part of this document we will go through +details needed for you to develop and run streaming applications in +Malhar. + +Test Application: Yahoo! Finance Quotes +---------------------------------------------------- + +The PI application was to +get you started. It is a basic application and does not fully illustrate +the features of the platform. For the purpose of describing concepts, we +will consider the test application shown in Figure 1. The application +downloads tick data from [Yahoo! Finance](http://finance.yahoo.com)  and computes the +following for four tickers, namely [IBM](http://finance.yahoo.com/q?s=IBM), +[GOOG](http://finance.yahoo.com/q?s=GOOG), [YHOO](http://finance.yahoo.com/q?s=YHOO). + +1. Quote: Consisting of last trade price, last trade time, and + total volume for the day +2. Per-minute chart data: Highest trade price, lowest trade + price, and volume during that minute +3. Simple Moving Average: trade price over 5 minutes + + +Total volume must ensure that all trade volume for that day is +added, i.e. data loss would result in wrong results. Charting data needs +all the trades in the same minute to go to the same slot, and then on it +starts afresh, so again data loss would result in wrong results. The +aggregation for charting data is done over 1 minute. Simple moving +average computes the average price over a 5 minute sliding window; it +too would produce wrong results if there is data loss. Figure 1 shows +the application with no partitioning. + + + + + + + +The operator StockTickerInput: StockTickerInput[ ](http://docs.google.com/../apidocs/com/datatorrent/demos/yahoofinance/StockTickInput.html)is +the input operator that reads live data from Yahoo! Finance once per +interval (user configurable in milliseconds), and emits the price, the +incremental volume, and the last trade time of each stock symbol, thus +emulating real ticks from the exchange.  We utilize the Yahoo! Finance +CSV web service interface.  For example: + + +``` +$ GET 'http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1' +"IBM",203.966,1513041,"1:43pm" +"GOOG",762.68,1879741,"1:43pm" +"AAPL",444.3385,11738366,"1:43pm" +"YHOO",19.3681,14707163,"1:43pm" +``` + + +Among all the operators in Figure 1, StockTickerInput is the only +operator that requires extra code because it contains a custom mechanism +to get the input data.  Other operators are used unchanged from the +Malhar library. + + +Here is the class implementation for StockTickInput: + + +```java +package com.datatorrent.demos.yahoofinance; + +import au.com.bytecode.opencsv.CSVReader; +import com.datatorrent.annotation.OutputPortFieldAnnotation; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.lib.util.KeyValPair; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.*; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.httpclient.cookie.CookiePolicy; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.params.DefaultHttpParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This operator sends price, volume and time into separate ports and calculates incremental volume. + */ +public class StockTickInput implements InputOperator +{ + private static final Logger logger = LoggerFactory.getLogger(StockTickInput.class); + /** + * Timeout interval for reading from server. 0 or negative indicates no timeout. + */ + public int readIntervalMillis = 500; + /** + * The URL of the web service resource for the POST request. + */ + private String url; + public String[] symbols; + private transient HttpClient client; + private transient GetMethod method; + private HashMap<String, Long> lastVolume = new HashMap<String, Long>(); + private boolean outputEvenIfZeroVolume = false; + /** + * The output port to emit price. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<String, Double>> price = new DefaultOutputPort<KeyValPair<String, Double>>(); + /** + * The output port to emit incremental volume. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<String, Long>> volume = new DefaultOutputPort<KeyValPair<String, Long>>(); + /** + * The output port to emit last traded time. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<String, String>> time = new DefaultOutputPort<KeyValPair<String, String>>(); + + /** + * Prepare URL from symbols and parameters. URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1 + * + * @return the URL + */ + private String prepareURL() + { + String str = "http://download.finance.yahoo.com/d/quotes.csv?s="; + for (int i = 0; i < symbols.length; i++) { + if (i != 0) { + str += ","; + } + str += symbols[i]; + } + str += "&f=sl1vt1&e=.csv"; + return str; + } + + @Override + public void setup(OperatorContext context) + { + url = prepareURL(); + client = new HttpClient(); + method = new GetMethod(url); + DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY); + } + + @Override + public void teardown() + { + } + + @Override + public void emitTuples() + { + + try { + int statusCode = client.executeMethod(method); + if (statusCode != HttpStatus.SC_OK) { + System.err.println("Method failed: " + method.getStatusLine()); + } + else { + InputStream istream = method.getResponseBodyAsStream(); + // Process response + InputStreamReader isr = new InputStreamReader(istream); + CSVReader reader = new CSVReader(isr); + List<String[]> myEntries = reader.readAll(); + for (String[] stringArr: myEntries) { + ArrayList<String> tuple = new ArrayList<String>(Arrays.asList(stringArr)); + if (tuple.size() != 4) { + return; + } + // input csv is <Symbol>,<Price>,<Volume>,<Time> + String symbol = tuple.get(0); + double currentPrice = Double.valueOf(tuple.get(1)); + long currentVolume = Long.valueOf(tuple.get(2)); + String timeStamp = tuple.get(3); + long vol = currentVolume; + // Sends total volume in first tick, and incremental volume afterwards. + if (lastVolume.containsKey(symbol)) { + vol -= lastVolume.get(symbol); + } + + if (vol > 0 || outputEvenIfZeroVolume) { + price.emit(new KeyValPair<String, Double>(symbol, currentPrice)); + volume.emit(new KeyValPair<String, Long>(symbol, vol)); + time.emit(new KeyValPair<String, String>(symbol, timeStamp)); + lastVolume.put(symbol, currentVolume); + } + } + } + Thread.sleep(readIntervalMillis); + } + catch (InterruptedException ex) { + logger.debug(ex.toString()); + } + catch (IOException ex) { + logger.debug(ex.toString()); + } + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + } + + public void setOutputEvenIfZeroVolume(boolean outputEvenIfZeroVolume) + { + this.outputEvenIfZeroVolume = outputEvenIfZeroVolume; + } + +} +``` + + + +The operator has three output ports that emit the price of the +stock, the volume of the stock and the last trade time of the stock, +declared as public member variables price, volume and time of the class.  The tuple of the +price output port is a key-value +pair with the stock symbol being the key, and the price being the value. + The tuple of the volume output +port is a key value pair with the stock symbol being the key, and the +incremental volume being the value.  The tuple of the time output port is a key value pair with the +stock symbol being the key, and the last trade time being the +value. + + + +Important: Since operators will be +serialized, all input and output ports need to be declared transient +because they are stateless and should not be serialized. + + + +The method setup(OperatorContext) +contains the code that is necessary for setting up the HTTP +client for querying Yahoo! Finance. + + + +Method emitTuples() contains +the code that reads from Yahoo! Finance, and emits the data to the +output ports of the operator.  emitTuples() will be called one or more times +within one application window as long as time is allowed within the +window. + + + +Note that we want to emulate the tick input stream by having +incremental volume data with Yahoo! Finance data.  We therefore subtract +the previous volume from the current volume to emulate incremental +volume for each tick. + + + +The operator +DailyVolume: This operator +reads from the input port, which contains the incremental volume tuples +from StockTickInput, and +aggregates the data to provide the cumulative volume.  It uses the +library class SumKeyVal<K,V> provided in math package.  In this case, +SumKeyVal<String,Long>, where K is the stock symbol, V is the +aggregated volume, with cumulative +set to true. (Otherwise if cumulativewas set to false, SumKeyVal would +provide the sum for the application window.)  Malhar provides a number +of built-in operators for simple operations like this so that +application developers do not have to write them.  More examples to +follow. This operator assumes that the application restarts before the +market opens every day. + + + +The operator Quote: +This operator has three input ports, which are price (from +StockTickInput), daily\_vol (from +Daily Volume), and time (from + StockTickInput).  This operator +just consolidates the three data items and and emits the consolidated +data.  It utilizes the class ConsolidatorKeyVal<K> from the +stream package. + + + +The operator HighLow: This operator reads from the input port, +which contains the price tuples from StockTickInput, and provides the high and the +low price within the application window.  It utilizes the library class + RangeKeyVal<K,V> provided +in the math package. In this case, +RangeKeyVal<String,Double>. + + + +The operator MinuteVolume: +This operator reads from the input port, which contains the +volume tuples from StockTickInput, +and aggregates the data to provide the sum of the volume within one +minute.  Like the operator DailyVolume, this operator also uses +SumKeyVal<String,Long>, but +with cumulative set to false.  The +Application Window is set to one minute. We will explain how to set this +later. + + + +The operator Chart: +This operator is very similar to the operator Quote, except that it takes inputs from +High Low and Minute Vol and outputs the consolidated tuples +to the output port. + + + +The operator PriceSMA: +SMA stands for - Simple Moving Average. It reads from the +input port, which contains the price tuples from StockTickInput, and +provides the moving average price of the stock.  It utilizes +SimpleMovingAverage<String,Double>, which is provided in the + multiwindow package. +SimpleMovingAverage keeps track of the data of the previous N +application windows in a sliding manner.  For each end window event, it +provides the average of the data in those application windows. + + + +The operator Console: +This operator just outputs the input tuples to the console +(or stdout).  In this example, there are four console operators, which connect to the output +of Quote, Chart, PriceSMA and VolumeSMA.  In +practice, they should be replaced by operators that use the data to +produce visualization artifacts like charts. + + + +Connecting the operators together and constructing the +DAG: Now that we know the +operators used, we will create the DAG, set the streaming window size, +instantiate the operators, and connect the operators together by adding +streams that connect the output ports with the input ports among those +operators.  This code is in the file YahooFinanceApplication.java. Refer to Figure 1 +again for the graphical representation of the DAG.  The last method in +the code, namely getApplication(), +does all that.  The rest of the methods are just for setting up the +operators. + + + +```java +package com.datatorrent.demos.yahoofinance; + +import com.datatorrent.api.ApplicationFactory; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.math.RangeKeyVal; +import com.datatorrent.lib.math.SumKeyVal; +import com.datatorrent.lib.multiwindow.SimpleMovingAverage; +import com.datatorrent.lib.stream.ConsolidatorKeyVal; +import com.datatorrent.lib.util.HighLow; +import org.apache.hadoop.conf.Configuration; + +/** + * Yahoo! Finance application demo. <p> + * + * Get Yahoo finance feed and calculate minute price range, minute volume, simple moving average of 5 minutes. + */ +public class Application implements StreamingApplication +{ + private int streamingWindowSizeMilliSeconds = 1000; // 1 second (default is 500ms) + private int appWindowCountMinute = 60; // 1 minute + private int appWindowCountSMA = 5 * 60; // 5 minute + + /** + * Get actual Yahoo finance ticks of symbol, last price, total daily volume, and last traded price. + */ + public StockTickInput getStockTickInputOperator(String name, DAG dag) + { + StockTickInput oper = dag.addOperator(name, StockTickInput.class); + oper.readIntervalMillis = 200; + return oper; + } + + /** + * This sends total daily volume by adding volumes from each ticks. + */ + public SumKeyVal<String, Long> getDailyVolumeOperator(String name, DAG dag) + { + SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>()); + oper.setType(Long.class); + oper.setCumulative(true); + return oper; + } + + /** + * Get aggregated volume of 1 minute and send at the end window of 1 minute. + */ + public SumKeyVal<String, Long> getMinuteVolumeOperator(String name, DAG dag, int appWindowCount) + { + SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>()); + oper.setType(Long.class); + oper.setEmitOnlyWhenChanged(true); +dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount); + return oper; + } + + /** + * Get High-low range for 1 minute. + */ + public RangeKeyVal<String, Double> getHighLowOperator(String name, DAG dag, int appWindowCount) + { + RangeKeyVal<String, Double> oper = dag.addOperator(name, new RangeKeyVal<String, Double>()); + dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount); + oper.setType(Double.class); + return oper; + } + + /** + * Quote (Merge price, daily volume, time) + */ + public ConsolidatorKeyVal<String,Double,Long,String,?,?> getQuoteOperator(String name, DAG dag) + { + ConsolidatorKeyVal<String,Double,Long,String,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,Double,Long,String,Object,Object>()); + return oper; + } + + /** + * Chart (Merge minute volume and minute high-low) + */ + public ConsolidatorKeyVal<String,HighLow,Long,?,?,?> getChartOperator(String name, DAG dag) + { + ConsolidatorKeyVal<String,HighLow,Long,?,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,HighLow,Long,Object,Object,Object>()); + return oper; + } + + /** + * Get simple moving average of price. + */ + public SimpleMovingAverage<String, Double> getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount) + { + SimpleMovingAverage<String, Double> oper = dag.addOperator(name, new SimpleMovingAverage<String, Double>()); + oper.setWindowSize(appWindowCount); + oper.setType(Double.class); + return oper; + } + + /** + * Get console for output. + */ + public InputPort<Object> getConsole(String name, /*String nodeName,*/ DAG dag, String prefix) + { + ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class); + oper.setStringFormat(prefix + ": %s"); + return oper.input; + } + + /** + * Create Yahoo Finance Application DAG. + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + dag.getAttributes().put(DAG.STRAM_WINDOW_SIZE_MILLIS,streamingWindowSizeMilliSeconds); + + StockTickInput tick = getStockTickInputOperator("StockTickInput", dag); + SumKeyVal<String, Long> dailyVolume = getDailyVolumeOperator("DailyVolume", dag); + ConsolidatorKeyVal<String,Double,Long,String,?,?> quoteOperator = getQuoteOperator("Quote", dag); + + RangeKeyVal<String, Double> highlow = getHighLowOperator("HighLow", dag, appWindowCountMinute); + SumKeyVal<String, Long> minuteVolume = getMinuteVolumeOperator("MinuteVolume", dag, appWindowCountMinute); + ConsolidatorKeyVal<String,HighLow,Long,?,?,?> chartOperator = getChartOperator("Chart", dag); + + SimpleMovingAverage<String, Double> priceSMA = getPriceSimpleMovingAverageOperator("PriceSMA", dag, appWindowCountSMA); + DefaultPartitionCodec<String, Double> codec = new DefaultPartitionCodec<String, Double>(); + dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec); + dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec); + dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data); + dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data); + dag.addStream("time", tick.time, quoteOperator.in3); + dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2); + + dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE")); + + dag.addStream("high_low", highlow.range, chartOperator.in1); + dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2); + dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART")); + + dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA")); + + return dag; + } + +} +``` + + + +Note that we also set a user-specific sliding window for SMA that +keeps track of the previous N data points.  Do not confuse this with the +attribute APPLICATION\_WINDOW\_COUNT. + +In the rest of this chapter we will run through the process of +running this application. We assume that  you are familiar with details +of your Hadoop infrastructure. For installation +details please refer to the [Installation Guide](installation.md). + + +Running a Test Application +----------------------------------------- + +We will now describe how to run the yahoo +finance application described above in different modes +(local mode, single node on Hadoop, and multi-nodes on Hadoop). + + +The platform runs streaming applications under the control of a +light-weight Streaming Application Manager (STRAM). Each application has +its own instance of STRAM. STRAM launches the application and +continually provides run time monitoring, analysis, and takes action +such as load scaling or outage recovery as needed.  We will discuss +STRAM in more detail in the next chapter. + + + +The instructions below assume that the platform was installed in a +directory <INSTALL\_DIR> and the command line interface (CLI) will +be used to launch the demo application. An application can be run in +[local mode](#h.3dy6vkm)[ ](#h.3dy6vkm)(in IDE or from command line) or on a [Hadoop cluster](#h.1t3h5sf) [.](#h.1t3h5sf) + + + +To start the dtCli run + + <INSTALL_DIR>/bin/dtcli + +The command line prompt appears. To start the application in local mode (the actual version number in the file name may differ) + + dt> launch -local <INSTALL_DIR>/yahoo-finance-demo-3.2.0-SNAPSHOT.apa + +To terminate the application in local mode, enter Ctrl-C + +Tu run the application on the Hadoop cluster (the actual version +number in the file name may differ) + + dt> launch <INSTALL_DIR>/yahoo-finance-demo-3.2.0-SNAPSHOT.apa + + +To stop the application running in Hadoop, terminate it in the dtCli: + + dt> kill-app + + + +Executing the application in either mode includes the following +steps. At a top level, STRAM (Streaming Application Manager) validates +the application (DAG), translates the logical plan to the physical plan +and then launches the execution engine. The mode determines the +resources needed and how how they are used. + +Local Mode +----------------------- + +In local mode, the application is run as a single-process with multiple threads. Although a +few Hadoop classes are needed, there is no dependency on a Hadoop +cluster or Hadoop services. The local file system is used in place of +HDFS. This mode allows a quick run of an application in a single process +sandbox, and hence is the most suitable to debug and analyze the +application logic. This mode is recommended for developing the +application and can be used for running applications within the IDE for +functional testing purposes. Due to limited resources and lack  of +scalability an application running in this single process mode is more +likely to encounter throughput bottlenecks. A distributed cluster is +recommended for benchmarking and production testing. + +Hadoop Cluster +--------------------------- + +In this section we discuss various Hadoop cluster setups. + +### Single Node Cluster + +In a single node Hadoop cluster all services are deployed on a +single server (a developer can use his/her development machine as a +single node cluster). The platform does not distinguish between a single +or multi-node setup and behaves exactly the same in both cases. + + + +In this mode, the resource manager, name node, data node, and node +manager occupy one process each. This is an example of running a +streaming application as a multi-process application on the same server. +With prevalence of fast, multi-core systems, this mode is effective for +debugging, fine tuning, and generic analysis before submitting the job +to a larger Hadoop cluster. In this mode, execution uses the Hadoop +services and hence is likely to identify issues that are related to the +Hadoop environment (such issues will not be uncovered in local mode). +The throughput will obviously not be as high as on a multi-node Hadoop +cluster. Additionally, since each container (i.e. Java process) requires +a significant amount of memory, you will be able to run a much smaller +number of containers than on a multi-node cluster. + +### Multi-Node Cluster + +In a multi-node Hadoop cluster all the services of Hadoop are +typically distributed across multiple nodes in a production or +production-level test environment. Upon launch the application is +submitted to the Hadoop cluster and executes as a multi-processapplication on multiple nodes. + + + +Before you start deploying, testing and troubleshooting your +application on a cluster, you should ensure that Hadoop (version 2.2.0 +or later) is properly installed and +you have basic skills for working with it. + +------------------------------------------------------------------------ + + + + + +Apache Apex Platform Overview +======================================== + +Streaming Computational Model +------------------------------------------ + +In this chapter, we describe the the basics of the real-time streaming platform and its computational model. + + +The platform is designed to enable completely asynchronous real time computations done in as unblocked a way as possible with +minimal overhead . + + + +Applications running in the platform are represented by a Directed +Acyclic Graph (DAG) made up of  operators and streams. All computations +are done in memory on arrival of +the input data, with an option to save the output to disk (HDFS) in a +non-blocking way. The data that flows between operators consists of +atomic data elements. Each data element along with its type definition +(henceforth called schema) is +called a tuple. An application is a +design of the flow of these tuples to and from +the appropriate compute units to enable the computation of the final +desired results. A message queue (henceforth called + buffer server) manages tuples streaming +between compute units in different processes.This server keeps track of +all consumers, publishers, partitions, and enables replay. More +information is given in later section. + + + +The streaming application is monitored by a decision making entity +called STRAM (streaming application +manager). STRAM is designed to be a light weight +controller that has minimal but sufficient interaction with the +application. This is done via periodic heartbeats. The +STRAM does the initial launch and periodically analyzes the system +metrics to decide if any run time action needs to be taken. + + + +A fundamental building block for the streaming platform +is the concept of breaking up a stream into equal finite time slices +called streaming windows. Each window contains the ordered +set of tuples in that time slice. A typical duration of a window is 500 +ms, but can be configured per application (the Yahoo! Finance +application configures this value in the properties.xml file to be 1000ms = 1s). Each +window is preceded by a begin\_window event and is terminated by an +end\_window event, and is assigned +a unique window ID. Even though the platform performs computations at +the tuple level, bookkeeping is done at the window boundary, making the +computations within a window an atomic event in the platform.  We can +think of each window as an atomic +micro-batch of tuples, to be processed together as one +atomic operation (See Figure 2).  + + + +This atomic batching allows the platform to avoid the very steep +per tuple bookkeeping cost and instead has a manageable per batch +bookkeeping cost. This translates to higher throughput, low recovery +time, and higher scalability. Later in this document we illustrate how +the atomic micro-batch concept allows more efficient optimization +algorithms. + + + +The platform also has in-built support for +application windows. An application window is part of the +application specification, and can be a small or large multiple of the +streaming window.  An example from our Yahoo! Finance test application +is the moving average, calculated over a sliding application window of 5 +minutes which equates to 300 (= 5 \* 60) streaming windows. + + + +Note that these two window concepts are distinct.  A streaming +window is an abstraction of many tuples into a higher atomic event for +easier management.  An application window is a group of consecutive +streaming windows used for data aggregation (e.g. sum, average, maximum, +minimum) on a per operator level. + + + +Alongside the platform, a set of +predefined, benchmarked standard library operator templates is provided +for ease of use and rapid development of application. These +operators are open sourced to Apache Software Foundation under the +project name âMalharâ as part of our efforts to foster community +innovation. These operators can be used in a DAG as is, while others +have [properties](#h.32hioqz) +[ ](#h.32hioqz)that can be set to specify the +desired computation. Those interested in details, should refer to +[Apex Malhar Operator Library](apex_malhar.md) +. + + + +The platform is a Hadoop YARN native +application. It runs in a Hadoop cluster just like any +other YARN application (MapReduce etc.) and is designed to seamlessly +integrate with rest of Hadoop technology stack. It leverages Hadoop as +much as possible and relies on it as its distributed operating system. +Hadoop dependencies include resource management, compute/memory/network +allocation, HDFS, security, fault tolerance, monitoring, metrics, +multi-tenancy, logging etc. Hadoop classes/concepts are reused as much +as possible. The aim is to enable enterprises +to leverage their existing Hadoop infrastructure for real time streaming +applications. The platform is designed to scale with big +data applications and scale with Hadoop. + + + +A streaming application is an asynchronous execution of +computations across distributed nodes. All computations are done in +parallel on a distributed cluster. The computation model is designed to +do as many parallel computations as possible in a non-blocking fashion. +The task of monitoring of the entire application is done on (streaming) +window boundaries with a streaming window as an atomic entity. A window +completion is a quantum of work done. There is no assumption that an +operator can be interrupted at precisely a particular tuple or window. + + + + +An operator itself also +cannot assume or predict the exact time a tuple that it emitted would +get consumed by downstream operators. The operator processes the tuples +it gets and simply emits new tuples based on its business logic. The +only guarantee it has is that the upstream operators are processing +either the current or some later window, and the downstream operator is +processing either the current or some earlier window. The completion of +a window (i.e. propagation of the end\_window event through an operator) in any +operator guarantees that all upstream operators have finished processing +this window. Thus, the end\_window event is blocking on an operator +with multiple outputs, and is a synchronization point in the DAG. The + begin\_window event does not have +any such restriction, a single begin\_window event from any upstream operator +triggers the operator to start processing tuples. + +Streaming Application Manager (STRAM) +-------------------------------------------------- + +Streaming Application Manager (STRAM) is the Hadoop YARN native +application master. STRAM is the first process that is activated upon +application launch and orchestrates the streaming application on the +platform. STRAM is a lightweight controller process. The +responsibilities of STRAM include + +1. Running the Application + + * Read the logical plan of the application (DAG) submitted by the client + * Validate the logical plan + * Translate the logical plan into a physical plan, where certain operators may be partitioned (i.e. replicated) to multiple operators for handling load. + * Request resources (Hadoop containers) from Resource Manager, + per physical plan + * Based on acquired resources and application attributes, create + an execution plan by partitioning the DAG into fragments, + each assigned to different containers. + * Executes the application by deploying each fragment to + its container. Containers then start stream processing and run + autonomously, processing one streaming window after another. Each + container is represented as an instance of the StreamingContainer class, which updates + STRAM via the heartbeat protocol and processes directions received + from STRAM. + +2. Continually monitoring the application via heartbeats from each StreamingContainer +3. Collecting Application System Statistics and Logs +4. Logging all application-wide decisions taken +5. Providing system data on the state of the application via a Web Service. +6. Supporting [Fault Tolerance](#h.2nusc19) + + a. Detecting a node outage + b. Requesting a replacement resource from the Resource Manager + and scheduling state restoration for the streaming operators + c. Saving state to Zookeeper + +7. Supporting [Dynamic + Partitioning](#h.3hv69ve)[:](#h.3hv69ve) Periodically + evaluating the SLA and modifying the physical plan if required + (logical plan does not change). +8. Enabling [Security](#h.3q5sasy)[:](#h.3q5sasy) Distributing + security tokens for distributed components of the execution engine + and securing web service requests. +9. Enabling [Dynamic modification](#h.40ew0vw)[ ](#h.40ew0vw)of + DAG: In the future, we intend to allow for user initiated + modification of the logical plan to allow for changes to the + processing logic and functionality. + + + +An example of the Yahoo! Finance Quote application scheduled on a +cluster of 5 Hadoop containers (processes) is shown in Figure 3. + + + + + +An example for the translation from a logical plan to a physical +plan and an execution plan for a subset of the application is shown in +Figure 4. + + + + + + + + + +Hadoop Components +------------------------------ + +In this section we cover some aspects of Hadoop that your +streaming application interacts with. This section is not meant to +educate the reader on Hadoop, but just get the reader acquainted with +the terms. We strongly advise readers to learn Hadoop from other +sources. + +A streaming application runs as a native Hadoop 2.2 application. +Hadoop 2.2 does not differentiate between a map-reduce job and other +applications, and hence as far as Hadoop is concerned, the streaming +application is just another job. This means that your application +leverages all the bells and whistles Hadoop provides and is fully +supported within Hadoop technology stack. The platform is responsible +for properly integrating itself with the relevant components of Hadoop +that exist today and those that may emerge in the future + + + +All investments that leverage multi-tenancy (for example quotas +and queues), security (for example kerberos), data flow integration (for +example copying data in-out of HDFS), monitoring, metrics collections, +etc. will require no changes when streaming applications run on +Hadoop. + +### YARN + +[YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site)is +the core library of Hadoop 2.2 that is tasked with resource management +and works as a distributed application framework. In this section we +will walk through Yarn's components. In Hadoop 2.2, the old jobTracker +has been replaced by a combination of ResourceManager (RM) and +ApplicationMaster (AM). + +#### Resource Manager (RM) + +[ResourceManager](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html)(RM) +manages all the distributed resources. It allocates and arbitrates all +the slots and the resources (cpu, memory, network) of these slots. It +works with per-node NodeManagers (NMs) and per-application +ApplicationMasters (AMs). Currently memory usage is monitored by RM; in +upcoming releases it will have CPU as well as network management. RM is +shared by map-reduce and streaming applications. Running streaming +applications requires no changes in the RM. + +#### Application Master (AM) + +The AM is the watchdog or monitoring process for your application +and has the responsibility of negotiating resources with RM and +interacting with NodeManagers to get the allocated containers started. +The AM is the starting point of your application and is considered user +code (not system Hadoop code). The AM itself runs in one container. All +resource management within the application are managed by the AM. This +is a critical feature for Hadoop 2.2 where tasks done by jobTracker in +Hadoop 1.0 have been distributed allowing Hadoop 2.2 to scale much +beyond Hadoop 1.0. STRAM is a native YARN ApplicationManager. + +#### Node Managers (NM) + +There is one [NodeManager](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html)(NM) +per node in the cluster. All the containers (i.e. processes) on that +node are monitored by the NM. It takes instructions from RM and manages +resources of that node as per RM instructions. NMs interactions are same +for map-reduce and for streaming applications. Running streaming +applications requires no changes in the NM. + +#### RPC Protocol + +Communication among RM, AM, and NM is done via the Hadoop RPC +protocol. Streaming applications use the same protocol to send their +data. No changes are needed in RPC support provided by Hadoop to enable +communication done by components of your application. + +### HDFS + +Hadoop includes a highly fault tolerant, high throughput +distributed file system ([HDFS](http://hadoop.apache.org/docs/r1.0.4/hdfs_design.html)). +It runs on commodity hardware, and your streaming application will, by +default, use it. There is no difference between files created by a +streaming application and those created by map-reduce. + +Developing An Application +====================================== + +In this chapter we describe the methodology to develop an +application using the Realtime Streaming Platform. The platform was +designed to make it easy to build and launch sophisticated streaming +applications with the developer having to deal only with the +application/business logic. The platform deals with details of where to +run what operators on which servers and how to correctly route streams +of data among them. + +Development Process +-------------------------------- + +While the platform does not mandate a specific methodology or set +of development tools, we have recommendations to maximize productivity +for the different phases of application development. + +#### Design + +- Identify common, reusable operators. Use a library + if possible. +- Identify scalability and performance requirements before + designing the DAG. +- Leverage attributes that the platform supports for scalability + and performance. +- Use operators that are benchmarked and tested so that later + surprises are minimized. If you have glue code, create appropriate + unit tests for it. +- Use THREAD_LOCAL locality for high throughput streams. If all + the operators on that stream cannot fit in one container, + try NODE_LOCAL locality. Both THREAD_LOCAL and + NODE_LOCAL streams avoid the Network Interface Card (NIC) + completly. The former uses intra-process communication to also avoid + serialization-deserialization overhead. +- The overall throughput and latencies are are not necessarily + correlated to the number of operators in a simple way -- the + relationship is more nuanced. A lot depends on how much work + individual operators are doing, how many are able to operate in + parallel, and how much data is flowing through the arcs of the DAG. + It is, at times, better to break a computation down into its + constituent simple parts and then stitch them together via streams + to better utilize the compute resources of the cluster. Decide on a + per application basis the fine line between complexity of each + operator vs too many streams. Doing multiple computations in one + operator does save network I/O, while operators that are too complex + are hard to maintain. +- Do not use operators that depend on the order of two streams + as far as possible. In such cases behavior is not idempotent. +- Persist key information to HDFS if possible; it may be useful + for debugging later. +- Decide on an appropriate fault tolerance mechanism. If some + data loss is acceptable, use the at-most-once mechanism as it has + fastest recovery. + +#### Creating New Project + +Please refer to the [Apex Application Packages](application_packages.md) for +the basic steps for creating a new project. + +#### Writing the application code + +Preferably use an IDE (Eclipse, Netbeans etc.) that allows you to +manage dependencies and assists with the Java coding. Specific benefits +include ease of managing operator library jar files, individual operator +classes, ports and properties. It will also highlight and assist to +rectify issues such as type mismatches when adding streams while +typing. + +#### Testing + +Write test cases with JUnit or similar test framework so that code +is tested as it is written. For such testing, the DAG can run in local +mode within the IDE. Doing this may involve writing mock input or output +operators for the integration points with external systems. For example, +instead of reading from a live data stream, the application in test mode +can read from and write to files. This can be done with a single +application DAG by instrumenting a test mode using settings in the +configuration that is passed to the application factory +interface. + +Good test coverage will not only eliminate basic validation errors +such as missing port connections or property constraint violations, but +also validate the correct processing of the data. The same tests can be +re-run whenever the application or its dependencies change (operator +libraries, version of the platform etc.) + +#### Running an application + +The platform provides a commandline tool called dtcli for managing applications (launching, +killing, viewing, etc.). This tool was already discussed above briefly +in the section entitled Running the Test Application. It will introspect +the jar file specified with the launch command for applications (classes +that implement ApplicationFactory) or property files that define +applications. It will also deploy the dependency jar files from the +application package to the cluster. + + + +Dtcli can run the application in local mode (i.e. outside a +cluster). It is recommended to first run the application in local mode +in the development environment before launching on the Hadoop cluster. +This way some of the external system integration and correct +functionality of the application can be verified in an easier to debug +environment before testing distributed mode. + + + +For more details on CLI please refer to the [dtCli Guide](dtcli.md). + +Application API +---------------------------- + +This section introduces the API to write a streaming application. +The work involves connecting operators via streams to form the logical +DAG. The steps are + +1. Instantiate an application (DAG) + +2. (Optional) Set Attributes + * Assign application name + * Set any other attributes as per application requirements + +3. Create/re-use and instantiate operators + * Assign operator name that is unique within the application + * Declare schema upfront for each operator (and thereby its [ports](#h.ihv636)[)](#h.ihv636) + * (Optional) Set [properties](#h.32hioqz)[ ](#h.32hioqz) and [attributes](#h.41mghml)[ ](#h.41mghml) on the dag as per specification + * Connect ports of operators via streams + * Each stream connects one output port of an operator to one or more input ports of other operators. + * (Optional) Set attributes on the streams + +4. Test the application. + + + +There are two methods to create an application, namely Java, and +Properties file. Java API is for applications being developed by humans, +and properties file (Hadoop like) is more suited for DAGs generated by +tools. + +### Java API + +The Java API is the most common way to create a streaming +application. It is meant for application developers who prefer to +leverage the features of Java, and the ease of use and enhanced +productivity provided by IDEs like NetBeans or Eclipse. Using Java to +specify the application provides extra validation abilities of Java +compiler, such as compile time checks for type safety at the time of +writing the code. Later in this chapter you can read more about +validation support in the platform. + +The developer specifies the streaming application by implementing +the ApplicationFactory interface, which is how platform tools (CLI etc.) +recognize and instantiate applications. Here we show how to create a +Yahoo! Finance application that streams the last trade price of a ticker +and computes the high and low price in every 1 min window. Run above + test application to execute the +DAG in local mode within the IDE. + + + +Let us revisit how the Yahoo! Finance test application constructs the DAG: + + + +```java +public class Application implements StreamingApplication +{ +[...CUT...] + @Override + public void populateDAG(DAG dag, Configuration conf) + { + dag.getAttributes().attr(DAG.STRAM_WINDOW_SIZE_MILLIS).set(streamingWindowSizeMilliSeconds); + + StockTickInput tick = getStockTickInputOperator("StockTickInput", dag); + SumKeyVal<String, Long> dailyVolume = getDailyVolumeOperator("DailyVolume", dag); + ConsolidatorKeyVal<String,Double,Long,String,?,?> quoteOperator = getQuoteOperator("Quote", dag); + + RangeKeyVal<String, Double> highlow = getHighLowOperator("HighLow", dag, appWindowCountMinute); + SumKeyVal<String, Long> minuteVolume = getMinuteVolumeOperator("MinuteVolume", dag, appWindowCountMinute); + ConsolidatorKeyVal<String,HighLow,Long,?,?,?> chartOperator = getChartOperator("Chart", dag); + + SimpleMovingAverage<String, Double> priceSMA = getPriceSimpleMovingAverageOperator("PriceSMA", dag, appWindowCountSMA); + + dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data); + dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data); + dag.addStream("time", tick.time, quoteOperator.in3); + dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2); + + dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE")); + + dag.addStream("high_low", highlow.range, chartOperator.in1); + dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2); + dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART")); + + dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA")); + + return dag; + } +} +``` + + + + +### Property File API + +The platform also supports specification of a DAG via a property +file. The aim here to make it easy for tools to create and run an +application. This method of specification does not have the Java +compiler support of compile time check, but since these applications +would be created by software, they should be correct by construction. +The syntax is derived from Hadoop properties and should be easy for +folks who are used to creating software that integrated with +Hadoop. + + + +Create an application (DAG): myApplication.properties + + +``` +# input operator that reads from a file +dt.operator.inputOp.classname=com.acme.SampleInputOperator +dt.operator.inputOp.fileName=somefile.txt + +# output operator that writes to the console +dt.operator.outputOp.classname=com.acme.ConsoleOutputOperator + +# stream connecting both operators +dt.stream.inputStream.source=inputOp.outputPort +dt.stream.inputStream.sinks=outputOp.inputPort +``` + + + +Above snippet is intended to convey the basic idea of specifying +the DAG without using Java. Operators would come from a predefined +library and referenced in the specification by class name and port names +(obtained from the library providers documentation or runtime +introspection by tools). For those interested in details, see later +sections and refer to the Operation and +Installation Guide mentioned above. + +### Attributes + +Attributes impact the runtime behavior of the application. They do +not impact the functionality. An example of an attribute is application +name. Setting it changes the application name. Another example is +streaming window size. Setting it changes the streaming window size from +the default value to the specified value. Users cannot add new +attributes, they can only choose from the ones that come packaged and +pre-supported by the platform. Details of attributes are covered in the + Operation and Installation +Guide. + +Operators +---------------------- + +Operators are basic compute units. +Operators process each incoming tuple and emit zero or more tuples on +output ports as per the business logic. The data flow, connectivity, +fault tolerance (node outage), etc. is taken care of by the platform. As +an operator developer, all that is needed is to figure out what to do +with the incoming tuple and when (and which output port) to send out a +particular output tuple. Correctly designed operators will most likely +get reused. Operator design needs care and foresight. For details, refer +to the [Operator Developer +Guide](https://www.datatorrent.com/docs/guides/OperatorDeveloperGuide.html) +. As an application developer you need to connect operators +in a way that it implements your business logic. You may also require +operator customization for functionality and use attributes for +performance/scalability etc. + + + +All operators process tuples asynchronously in a distributed +cluster. An operator cannot assume or predict the exact time a tuple +that it emitted will get consumed by a downstream operator. An operator +also cannot predict the exact time when a tuple arrives from an upstream +operator. The only guarantee is that the upstream operators are +processing the current or a future window, i.e. the windowId of upstream +operator is equals or exceeds its own windowId. Conversely the windowId +of a downstream operator is less than or equals its own windowId. The +end of a window operation, i.e. the API call to endWindow on an operator +requires that all upstream operators have finished processing this +window. This means that completion of processing a window propagates in +a blocking fashion through an operator. Later sections provides more +details on streams and data flow of tuples. + + + +Each operator has a unique name within the DAG as provided by the +user. This is the name of the operator in the logical plan. The name of +the operator in the physical plan is an integer assigned to it by STRAM. +These integers are use the sequence from 1 to N, where N is total number +of physically unique operators in the DAG.  Following the same rule, +each partitioned instance of a logical operator has its own integer as +an id. This id along with the Hadoop container name uniquely identifies +the operator in the execution plan of the DAG. The logical names and the +physical names are required for web service support. Operators can be +accessed via both names. These same names are used while interacting +with dtcli to access an operator. +Ideally these names should be self-descriptive. For example in Figure 1, +the node named âDaily volumeâ has a physical identifier of 2. + +### Operator Interface + +Operator interface in a DAG consists of [ports](#h.ihv636)[,](#h.ihv636) [properties](#h.32hioqz)[,](#h.32hioqz) and + [attributes](#h.41mghml) +[.](#h.41mghml) Operators interact with other +components of the DAG via ports. Functional behavior of the operators +can be customized via parameters. Run time performance and physical +instantiation is controlled by attributes. Ports and parameters are +fields (variables) of the Operator class/object, while attributes are +meta information that is attached to the operator object via an +AttributeMap. An operator must have at least one port. Properties are +optional. Attributes are provided by the platform and always have a +default value that enables normal functioning of operators. + +#### Ports + +Ports are connection points by which an operator receives and +emits tuples. These should be transient objects instantiated in the +operator object, that implement particular interfaces. Ports should be +transient as they contain no state. They have a pre-defined schema and +can only be connected to other ports with the same schema. An input port +needs to implement the interface Operator.InputPort and +interface Sink. A default +implementation of these is provided by the abstract class DefaultInputPort. An output port needs to +implement the interface Operator.OutputPort. A default implementation +of this is provided by the concrete class DefaultOutputPort. These two are a quick way to +implement the above interfaces, but operator developers have the option +of providing their own implementations. + + + +Here are examples of an input and an output port from the operator +Sum. + + + +```java +@InputPortFieldAnnotation(name = "data") +public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() { + @Override + public void process(V tuple) + { + ... + } +} +@OutputPortFieldAnnotation(optional=true) +public final transient DefaultOutputPort<V> sum = new DefaultOutputPort<V>(){ ⦠}; +``` + + + + +The process call is in the Sink interface. An emit on an output +port is done via emit(tuple) call. For the above example it would be +sum.emit(t), where the type of t is the generic parameter V. + + + +There is no limit on how many ports an operator can have. However +any operator must have at least one port. An operator with only one port +is called an Input Adapter if it has no input port and an Output Adapter +if it has no output port. These are special operators needed to get/read +data from outside system/source into the application, or push/write data +into an outside system/sink. These could be in Hadoop or outside of +Hadoop. These two operators are in essence gateways for the streaming +application to communicate with systems outside the application. + + + +Port connectivity can be validated during compile time by adding +PortFieldAnnotations shown above. By default all ports have to be +connected, to allow a port to go unconnected, you need to add +âoptional=trueâ to the annotation. + + + +Attributes can be specified for ports that affect the runtime +behavior. An example of an attribute is parallel partition that specifes +a parallel computation flow per partition. It is described in detail in +the [Parallel +Partitions](#h.3vac5uf)[ ](#h.3vac5uf)section. +Another example is queue capacity that specifies the buffer size for the +port. Details of attributes are covered in Operation and Installation Guide. + +#### Properties + +Properties are the abstractions by which functional behavior of an +operator can be customized. They should be non-transient objects +instantiated in the operator object. They need to be non-transient since +they are part of the operator state and re-construction of the operator +object from its checkpointed state must restore the operator to the +desired state. Properties are optional, i.e. an operator may or may not +have properties; they are part of user code and their values are not +interpreted by the platform in any way. + + + +All non-serializable objects should be declared transient. +Examples include sockets, session information, etc. These objects should +be initialized during setup call, which is called every time the +operator is initialized. + +#### Attributes + +Attributes are values assigned to the operators that impact +run-time. This includes things like the number of partitions, at most +once or at least once or exactly once recovery modes, etc. Attributes do +not impact functionality of the operator. Users can change certain +attributes in runtime. Users cannot add attributes to operators; they +are pre-defined by the platform. They are interpreted by the platform +and thus cannot be defined in user created code (like properties). +Details of attributes are covered in [Operation and Installation Guide](http://docs.google.com/OperationandInstallationGuide.html) +. + +### Operator State + +The state of an operator is defined as the data that it transfers +from one window to a future window. Since the computing model of the +platform is to treat windows like micro-batches, the operator state can +be [checkpointed](#h.3mzq4wv)[ ](#h.3mzq4wv)every +Nth window, or every T units of time, where T is significantly greater +than the streaming window.  When an operator is checkpointed, the entire +object is written to HDFS.  The larger the amount of state in an +operator, the longer it takes to recover from a failure. A stateless +operator can recover much quicker than a stateful one. The needed +windows are preserved by the upstream buffer server and are used to +recompute the lost windows, and also rebuild the buffer server in the +current container. + + + +The distinction between Stateless and Stateful is based solely on +the need to transfer data in the operator from one window to the next. +The state of an operator is independent of the number of ports. + +#### Stateless + +A Stateless operator is defined as one where no data is needed to +be kept at the end of every window. This means that all the computations +of a window can be derived from all the tuples the operator receives +within that window. This guarantees that the output of any window can be +reconstructed by simply replaying the tuples that arrived in that +window. Stateless operators are more efficient in terms of fault +tolerance, and cost to achieve SLA. + +#### Stateful + +A Stateful operator is defined as one where data is needed to be +stored at the end of a window for computations occurring in later +window; a common example is the computation of a sum of values in the +input tuples. + +### Operator API + +The Operator API consists of methods that operator developers may +need to override. In this section we will discuss the Operator APIs from +the point of view of an application developer. Knowledge of how an +operator works internally is critical for writing an application. Those +interested in the details should refer to Malhar Operator Developer Guide. + + + +The APIs are available in three modes, namely Single Streaming +Window, Sliding Application Window, and Aggregate Application Window. +These are not mutually exclusive, i.e. an operator can use single +streaming window as well as sliding application window. A physical +instance of an operator is always processing tuples from a single +window. The processing of tuples is guaranteed to be sequential, no +matter which input port the tuples arrive on. + + + +In the later part of this section we will evaluate three common +uses of streaming windows by applications. They have different +characteristics and implications on optimization and recovery mechanisms +(i.e. algorithm used to recover a node after outage) as discussed later +in the section. + +#### Streaming Window + +Streaming window is atomic micro-batch computation period. The API +methods relating to a streaming window are as follows + + + +[](#) [](#) + +<table> +<colgroup> +<col width="100%" /> +</colgroup> +<tbody> +<tr class="odd"> +<td align="left"><p>public void process(<tuple_type> tuple) // Called on the input port on which the tuple arrives</p> +<p>public void beginWindow(long windowId) // Called at the start of the window as soon as the first begin_window tuple arrives</p> +<p>public void endWindow() // Called at the end of the window after end_window tuples arrive on all input ports</p> +<p>public void setup(OperatorContext context) // Called once during initialization of the operator</p> +<p>public void teardown() // Called once when the operator is being shutdown</p></td> +</tr> +</tbody> +</table> + + + + + +A tuple can be emitted in any of the three streaming run-time +calls, namely beginWindow, process, and endWindow but not in setup or +teardown. + +#### Aggregate Application Window + +An operator with an aggregate window is stateful within the +application window timeframe and possibly stateless at the end of that +application window. An size of an aggregate application window is an +operator attribute and is defined as a multiple of the streaming window +size. The platform recognizes this attribute and optimizes the operator. +The beginWindow, and endWindow calls are not invoked for those streaming +windows that do not align with the application window. For example in +case of streaming window of 0.5 second and application window of 5 +minute, an application window spans 600 streaming windows (5\*60\*2 = +600). At the start of the sequence of these 600 atomic streaming +windows, a beginWindow gets invoked, and at the end of these 600 +streaming windows an endWindow gets invoked. All the intermediate +streaming windows do not invoke beginWindow or endWindow. Bookkeeping, +node recovery, stats, UI, etc. continue to work off streaming windows. +For example if operators are being checkpointed say on an average every +30th window, then the above application window would have about 20 +checkpoints. + +#### Sliding Application Window + +A sliding window is computations that requires previous N +streaming windows. After each streaming window the Nth past window is +dropped and the new window is added to the computation. An operator with +sliding window is a stateful operator at end of any window. The sliding +window period is an attribute and is a multiple of streaming window. The +platform recognizes this attribute and leverages it during bookkeeping. +A sliding aggregate window with tolerance to data loss does not have a +very high bookkeeping cost. The cost of all three recovery mechanisms, + at most once (data loss tolerant), +at least once (data loss +intolerant), and exactly once (data +loss intolerant and no extra computations) is same as recovery +mechanisms based on streaming window. STRAM is not able to leverage this +operator for any extra optimization. + +### Single vs Multi-Input Operator + +A single-input operator by definition has a single upstream +operator, since there can only be one writing port for a stream.  If an +operator has a single upstream operator, then the beginWindow on the +upstream also blocks the beginWindow of the single-input operator. For +an operator to start processing any window at least one upstream +operator has to start processing that window. A multi-input operator +reads from more than one upstream ports. Such an operator would start +processing as soon as the first begin_window event arrives. However the +window would not close (i.e. invoke endWindow) till all ports receive +end\_window events for that windowId. Thus the end of a window is a +blocking event. As we saw earlier, a multi-input operator is also the +point in the DAG where windows of all upstream operators are +synchronized. The windows (atomic micro-batches) from a faster (or just +ahead in processing) upstream operators are queued up till the slower +upstream operator catches up. STRAM monitors such bottlenecks and takes +corrective actions. The platform ensures minimal delay, i.e processing +starts as long as at least one upstream operator has started +processing. + +### Recovery Mechanisms + +Application developers can set any of the recovery mechanisms +below to deal with node outage. In general, the cost of recovery depends +on the state of the operator, while data integrity is dependant on the +application. The mechanisms are per window as the platform treats +windows as atomic compute units. Three recovery mechanisms are +supported, namely + +- At-least-once: All atomic batches are processed at least once. + No data loss occurs. +- At-most-once: All atomic batches are processed at most once. + Data loss is possible; this is the most efficient setting. +- Exactly-once: All atomic batches are processed exactly once. + No data loss occurs; this is the least efficient setting since + additional work is needed to ensure proper semantics. + +At-least-once is the default. During a recovery event, the +operator connects to the upstream buffer server and asks for windows to +be replayed. At-least-once and exactly-once mechanisms start from its +checkpointed state. At-most-once starts from the next begin-window +event. + + + +Recovery mechanisms can be specified per Operator while writing +the application as shown below. + + + +Operator o = dag.addOperator(âoperatorâ, â¦); + +dag.setAttribute(o, + +         OperatorContext.PROCESSING\_MODE, + + +        + ProcessingMode.AT\_MOST\_ONCE); + + + +Also note that once an operator is attributed to AT\_MOST\_ONCE, +all the operators downstream to it have to be AT\_MOST\_ONCE. The client +will give appropriate warnings or errors if thatâs not the case. + + + +Details are explained in the chapter on Fault Tolerance +below[.](#h.2nusc19) + +Streams +-------------------- + +A stream is a connector +(edge) abstraction, and is a fundamental building block of the platform. +A stream consists of tuples that flow from one port (called the +output port) to one or more ports +on other operators (called input ports) another -- so note a potentially +confusing aspect of this terminology: tuples enter a stream through its +output port and leave via one or more input ports. A stream has the +following characteristics + +- Tuples are always delivered in the same order in which they + were emitted. +- Consists of a sequence of windows one after another. Each + window being a collection of in-order tuples. +- A stream that connects two containers passes through a + buffer server. +- All streams can be persisted (by default in HDFS). +- Exactly one output port writes to the stream. +- Can be read by one or more input ports. +- Connects operators within an application, not outside + an application. +- Has an unique name within an application. +- Has attributes which act as hints to STRAM. +- Streams have four modes, namely in-line, in-node, in-rack, + and other. Modes may be overruled (for example due to lack + of containers). They are defined as follows: + +<!-- --> + +- THREAD\_LOCAL: In the same thread, uses thread + stack (intra-thread). This mode can only be used for a downstream + operator which has only one input port connected; also called + in-line. +- CONTAINER\_LOCAL: In the same container (intra-process); also + called in-container. +- NODE\_LOCAL: In the same Hadoop node (inter processes, skips + NIC); also called in-node. +- RACK\_LOCAL: On nodes in the same rack; also called + in-rack. +- unspecified: No guarantee. Could be anywhere within the + cluster + + + +An example of a stream declaration is given below + + + +[](#) [](#) + +<table> +<colgroup> +<col width="100%" /> +</colgroup> +<tbody> +<tr class="odd"> +<td align="left"><p>DAG dag = new DAG();</p> +<p> â¦</p> +<p>dag.addStream("views", viewAggregate.sum, cost.data).setLocality(CONTAINER_LOCAL); // A container local  stream</p> +<p>dag.addStream(âclicksâ, clickAggregate.sum, rev.data); // An example of unspecified locality</p></td> +</tr> +</tbody> +</table> + + + +The platform guarantees in-order delivery of tuples in a stream. +STRAM views each stream as collection of ordered windows. Since no tuple +can exist outside a window, a replay of a stream consists of replay of a +set of windows. When multiple input ports read the same stream, the +execution plan of a stream ensures that each input port is logically not +blocked by the reading of another input port. The schema of a stream is +same as the schema of the tuple. + + + +In a stream all tuples emitted by an operator in a window belong +to that window. A replay of this window would consists of an in-order +replay of all the tuples. Thus the tuple order within a stream is +guaranteed. However since an operator may receive multiple streams (for +example an operator with two input ports), the order of arrival of two +tuples belonging to different streams is not guaranteed. In general in +an asynchronous distributed architecture this is expected. Thus the +operator (specially one with multiple input ports) should not depend on +the tuple order from two streams. One way to cope with this +indeterminate order, if necessary, is to wait to get all the tuples of a +window and emit results in endWindow call. All operator templates +provided as part of [standard operator template +library](#h.3ep43zb) [ ](#h.3ep43zb)follow +these principles. + + + +A logical stream gets partitioned into physical streams each +connecting the partition to the upstream operator. If two different +attributes are needed on the same stream, it should be split using +StreamDuplicator operator. + + + +Modes of the streams are critical for performance. An in-line +stream is the most optimal as it simply delivers the tuple as-is without +serialization-deserialization. Streams should be marked +container\_local, specially in case where there is a large tuple volume +between two operators which then on drops significantly. Since the +setLocality call merely provides a hint, STRAM may ignore it. An In-node +stream is not as efficient as an in-line one, but it is clearly better +than going off-node since it still avoids the potential bottleneck of +the network card. + + + +THREAD\_LOCAL and CONTAINER\_LOCAL streams do not use a buffer +server as this stream is in a single process. The other two do. + +Validating an Application +-------------------------------------- + +The platform provides various ways of validating the application +specification and data input. An understanding of these checks is very +important for an application developer since it affects productivity. +Validation of an application is done in three phases, namely + + + +1. Compile Time: Caught during application development, and is + most cost effective. These checks are mainly done on declarative + objects and leverages the Java compiler. An example is checking that + the schemas specified on all ports of a stream are + mutually compatible. +2. Initialization Time: When the application is being + initialized, before submitting to Hadoop. These checks are related + to configuration/context of an application, and are done by the + logical DAG builder implementation. An example is the checking that + all non-optional ports are connected to other ports. +3. Run Time: Validations done when the application is running. + This is the costliest of all checks. These are checks that can only + be done at runtime as they involve data. For example divide by 0 + check as part of business logic. + +### Compile Time + +Compile time validations apply when an application is specified in +Java code and include all checks that can be done by Java compiler in +the development environment (including IDEs like NetBeans or Eclipse). +Examples include + +1. Schema Validation: The tuples on ports are POJO (plain old + java objects) and compiler checks to ensure that all the ports on a + stream have the same schema. +2. Stream Check: Single Output Port and at least one Input port + per stream. A stream can only have one output port writer. This is + part of the addStream api. This + check ensures that developers only connect one output port to + a stream. The same signature also ensures that there is at least one + input port for a stream +3. Naming: Compile time checks ensures that applications + components operators, streams are named + +### Initialization/Instantiation Time + +Initialization time validations include various checks that are +done post compile, and before the application starts running in a +cluster (or local mode). These are mainly configuration/contextual in +nature. These checks are as critical to proper functionality of the +application as the compile time validations. + + + +Examples include + +- [JavaBeans + Validation](http://docs.oracle.com/javaee/6/tutorial/doc/gircz.html): + Examples include + +<!-- --> + +- @Max(): Value must be less than or equal to the number + +<!-- --> + +- @Min(): Value must be greater than or equal to the + number +- @NotNull: The value of the field or property must not be + null +- @Pattern(regexp = â....â): Value must match the regular + expression +- Input port connectivity: By default, every non-optional input + port must be connected. A port can be declared optional by using an + annotation:   @InputPortFieldAnnotation(name = "...", optional + = true) +- Output Port Connectivity: Similar. The annotation here is:   + @OutputPortFieldAnnotation(name = "...", optional = true) + +<!-- --> + +- Unique names in application scope: Operators, streams, must have + unique names. +- Cycles in the dag: DAG cannot have a cycle. +- Unique names in operator scope: Ports, properties, annotations + must have unique names. +- One stream per port: A port can connect to only one stream. + This check applies to input as well as output ports even though an + output port can technically write to two streams. If you must have + two streams originating from a single output port, use  a streamDuplicator operator. +- Application Window Period: Has to be an integral multiple the + streaming window period. + +### Run Time + +Run time checks are those that are done when the application is +running. The real-time streaming platform provides rich run time error +handling mechanisms. The checks are exclusively done by the application +business logic, but the platform allows applications to count and audit +these. Some of these features are in the process of development (backend +and UI) and this section will be updated as they are developed. Upon +completion examples will be added to [demos](#h.upglbi) [t](#h.upglbi)o +illustrate these. + + + +Error ports are output ports with error annotations. Since they +are normal ports, they can be monitored and tuples counted, persisted +and counts shown in the UI. + +------------------------------------------------------------------------ + + + + + +Multi-Tenancy and Security +======================================= + +Hadoop is a multi-tenant distributed operating system. Security is +an intrinsic element of multi-tenancy as without it a cluster cannot be +reasonably be shared among enterprise applications. Streaming +applications follow all multi-tenancy security models used in Hadoop as +they are native Hadoop applications. For details refer to the +[Operation and Installation +Guide](https://www.datatorrent.com/docs/guides/OperationandInstallationGuide.html) +. + +Security +--------------------- + +The platform includes Kerberos support. Both access points, namely +STRAM and Bufferserver are secure. STRAM passes the token over to +StreamingContainer, which then gives it to the Bufferserver. The most +important aspect for an application developer is to note that STRAM is +the single point of access to ensure security measures are taken by all +components of the platform. + +Resource Limits +---------------------------- + +Hadoop enforces quotas on resources. This includes hard-disk (name +space and total disk quota) as well as priority queues for schedulers. +The platform uses Hadoop resource limits to manage a streaming +application. In addition network I/O quotas can be enforced. An operator +can be dynamically partitioned if it reaches its resource limits; these +limits may be expressed in terms of throughput, latency, or just +aggregate resource utilization of a container. + + + + + +------------------------------------------------------------------------ + + + + + +Scalability and Partitioning +========================================= + +Scalability is a foundational element of this platform and is a +building block for an eco-system where big-data meets real-time. +Enterprises need to continually meet SLA as data grows. Without the +ability to scale as load grows, or new applications with higher loads +come to fruition, enterprise grade SLA cannot be met. A big issue with +the streaming application space is that, it is not just about high load, +but also the fluctuations in it. There is no way to guarantee future +load requirements and there is a big difference between high and low +load within a day for the same feed. Traditional streaming platforms +solve these two cases by simply throwing more hardware at the +problem. + + + +Daily spikes are managed by ensuring enough hardware for peak +load, which then idles during low load, and future needs are handled by +a very costly re-architecture, or investing heavily in building a +scalable distributed operating system. Another salient and often +overlooked cost is the need to manage SLA -- letâs call it buffer capacity. Since this means computing the +peak load within required time, that translates to allocating enough +resources over and above peak load as daily peaks fluctuate. For example +an average peak load of 100 resource units (cpu and/or memory and/or +network) may mean allocating about 200 resource units to be safe. A +distributed cluster that cannot dynamically scale up and down, in effect +pays buffer capacity per application. Another big aspect of streaming +applications is that the load is not just ingestion rate, more often +than not, the internal operators produce lot more events than the +ingestion rate. For example a dimensional data (with, say d dimensions) computation needs 2\*d -1 computations per ingested event. A lot +of applications have over 10 dimensions, i.e over 1000 computations per +incoming event and these need to be distributed across the cluster, +thereby causing an explosion in the throughput (events/sec) that needs +to be managed. + + + +The platform is designed to handle such cases at a very low cost. +The platform scales linearly with Hadoop. If applications need more +resources, the enterprise can simply add more commodity nodes to Hadoop +without any downtime, and the Hadoop native platform will take care of +the rest. If some nodes go bad, these can be removed without downtime. +The daily peaks and valleys in the load are managed by the platform by +dynamically scaling at the peak and then giving the resources back to +Hadoop during low load. This means that a properly designed Hadoop +cluster does several things for enterprises: (a) reduces the cost of +hardware due to use of commodity hardware (b) shares buffer capacity +across all applications as peaks of all applications may not align and +(c) raises the average CPU usage on a 24x7 basis. As a general design +this is similar to scale that a map-reduce application can deliver. In +the following sections of this chapter we will see how this is +done. + +Partitioning +------------------------- + +If all tuples sent through the stream(s) that are connected to the +input port(s) of an operator in the DAG are received by a single +physical instance of that operator, that operator can become a +performance bottleneck. This leads to scalability issues when +throughput, memory, or CPU needs exceed the processing capacity of that +single instance. + + + +To address the problem, the platform offers the capability to +partition the inflow of data so that it is divided across multiple +physical instances of a logical operator in the DAG. There are two +functional ways to partition + +- Load balance: Incoming load is simply partitioned + into stream(s) that go to separate instances of physical operators + and scalability is achieved via adding more physical operators. Each + tuple is sent to physical operator (partition) based on a + round-robin or other similar algorithm. This scheme scales linearly. + A lot of key based computations can load balance in the platform due + to the ability to insert Unifiers. For many computations, the + endWindow and Unifier setup is similar to the combiner and reducer + mechanism in a Map-Reduce computation. +- Sticky Key: The key assertion is that distribution of tuples + are sticky, i.e the data with + same key will always be processed by the same physical operator, no + matter how many times it is sent through the stream. This stickiness + will continue even if the number of partitions grows dynamically and + can eventually be leveraged for advanced features like + bucket testing. How this is accomplished and what is required to + develop compliant operators will be explained below. + + + +We plan to add more partitioning mechanisms proactively to the +platform over time as needed by emerging usage patterns. The aim is to +allow enterprises to be able to focus on their business logic, and +significantly reduce the cost of operability. As an enabling technology +for managing high loads, this platform provides enterprises with a +significant innovative edge. Scalability and Partitioning is a +foundational building block for this platform. + +### Sticky Partition vs Round Robin + +As noted above, partitioning via sticky key is data aware but +round-robin partitioning is not. An example for non-sticky load +balancing would be round robin distribution over multiple instances, +where for example a tuple stream of A, A, +A with 3 physical operator +instances would result in processing of a single A by each of the instances, In contrast, sticky +partitioning means that exactly one instance of the operators will +process all of the Atuples if they +fall into the same bucket, while B +may be processed by another operator. Data aware mapping of +tuples to partitions (similar to distributed hash table) is accomplished +via Stream Codecs. In later sections we would show how these two +approaches can be used in combination. + +### Stream Codec + +The platform does not make assumptions about the tuple +type, it could be any Java object. The operator developer knows what +tuple type an input port expects and is capable of processing. Each +input port has a stream codec  associated thatdefines how data is serialized when transmitted over a socket +stream; it also defines another +function that computes the partition hash key for the tuple. The engine +uses that key to determine which physical instance(s)  (for a +partitioned operator) receive that  tuple. For this to work, consistent hashing is required. +The default codec uses the Java Object\#hashCode function, which is +sufficient for basic types such as Integer, String etc. It will also +work with custom tuple classes as long as they implement hashCode +appropriately. Reliance on hashCode may not work when generic containers +are used that do not hash the actual data, such as standard collection +classes (HashMap etc.), in which case a custom stream codec must be +assigned to the input port. + +### Static Partitioning + +DAG designers can specify at design time how they would like +certain operators to be partitioned. STRAM then instantiates the DAG +with the physical plan which adheres to the partitioning scheme defined +by the design. This plan is the initial partition of the application. In +other words, Static Partitioning is used to tell STRAM to compute the +physical DAG from a logical DAG once, without taking into consideration +runtime states or loads of various operators. + +### Dynamic Partitioning + +In streaming applications the load changes during the day, thus +creating situations where the number of partitioned operator instances +needs to adjust dynamically. The load can be measured in terms of +processing within the
<TRUNCATED>
