Repository: incubator-apex-core Updated Branches: refs/heads/APEXCORE-293 66e86d7dc -> 254beb4f7 (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/dtcli.md ---------------------------------------------------------------------- diff --git a/dtcli.md b/dtcli.md deleted file mode 100644 index 813a27f..0000000 --- a/dtcli.md +++ /dev/null @@ -1,273 +0,0 @@ -Apache Apex Command Line Interface -================================================================================ - -dtCli, the Apache Apex command line interface, can be used to launch, monitor, and manage -Apache Apex applications. dtCli is a wrapper around the [REST API](dtgateway_api.md) provided by dtGatway, and -provides a developer friendly way of interacting with Apache Apex platform. The CLI enables a much higher level of feature set by -hiding deep details of REST API. Another advantage of dtCli is to provide scope, by connecting and executing commands in a context -of specific application. dtCli enables easy integration with existing enterprise toolset for automated application monitoring -and management. Currently the following high level tasks are supported. - -- Launch or kill applications -- View system metrics including load, throughput, latency, etc. -- Start or stop tuple recording -- Read operator, stream, port properties and attributes -- Write to operator properties -- Dynamically change the application logical plan -- Create custom macros - - -## dtcli Commands - -dtCli can be launched by running following command on the same machine where dtGatway was installed - - dtcli - -Help on all commands is available via âhelpâ command in the CLI - -### Global Commands - -``` -GLOBAL COMMANDS EXCEPT WHEN CHANGING LOGICAL PLAN: - -alias alias-name command - Create a command alias - -begin-macro name - Begin Macro Definition ($1...$9 to access parameters and type 'end' to end the definition) - -connect app-id - Connect to an app - -dump-properties-file out-file jar-file class-name - Dump the properties file of an app class - -echo [arg ...] - Echo the arguments - -exit - Exit the CLI - -get-app-info app-id - Get the information of an app - -get-app-package-info app-package-file - Get info on the app package file - -get-app-package-operator-properties app-package-file operator-class - Get operator properties within the given app package - -get-app-package-operators [options] app-package-file [search-term] - Get operators within the given app package - Options: - -parent Specify the parent class for the operators - -get-config-parameter [parameter-name] - Get the configuration parameter - -get-jar-operator-classes [options] jar-files-comma-separated [search-term] - List operators in a jar list - Options: - -parent Specify the parent class for the operators - -get-jar-operator-properties jar-files-comma-separated operator-class-name - List properties in specified operator - -help [command] - Show help - -kill-app app-id [app-id ...] - Kill an app - - launch [options] jar-file/json-file/properties-file/app-package-file [matching-app-name] - Launch an app - Options: - -apconf <app package configuration file> Specify an application - configuration file - within the app - package if launching - an app package. - -archives <comma separated list of archives> Specify comma - separated archives - to be unarchived on - the compute machines. - -conf <configuration file> Specify an - application - configuration file. - -D <property=value> Use value for given - property. - -exactMatch Only consider - applications with - exact app name - -files <comma separated list of files> Specify comma - separated files to - be copied on the - compute machines. - -ignorepom Do not run maven to - find the dependency - -libjars <comma separated list of libjars> Specify comma - separated jar files - or other resource - files to include in - the classpath. - -local Run application in - local mode. - -originalAppId <application id> Specify original - application - identifier for restart. - -queue <queue name> Specify the queue to - launch the application - -list-application-attributes - Lists the application attributes -list-apps [pattern] - List applications -list-operator-attributes - Lists the operator attributes -list-port-attributes - Lists the port attributes -set-pager on/off - Set the pager program for output -show-logical-plan [options] jar-file/app-package-file [class-name] - List apps in a jar or show logical plan of an app class - Options: - -exactMatch Only consider exact match - for app name - -ignorepom Do not run maven to find - the dependency - -libjars <comma separated list of jars> Specify comma separated - jar/resource files to - include in the classpath. -shutdown-app app-id [app-id ...] - Shutdown an app -source file - Execute the commands in a file -``` - -### Commands after connecting to an application - -``` -COMMANDS WHEN CONNECTED TO AN APP (via connect <appid>) EXCEPT WHEN CHANGING LOGICAL PLAN: - -begin-logical-plan-change - Begin Logical Plan Change -dump-properties-file out-file [jar-file] [class-name] - Dump the properties file of an app class -get-app-attributes [attribute-name] - Get attributes of the connected app -get-app-info [app-id] - Get the information of an app -get-operator-attributes operator-name [attribute-name] - Get attributes of an operator -get-operator-properties operator-name [property-name] - Get properties of a logical operator -get-physical-operator-properties [options] operator-id - Get properties of a physical operator - Options: - -propertyName <property name> The name of the property whose - value needs to be retrieved - -waitTime <wait time> How long to wait to get the result -get-port-attributes operator-name port-name [attribute-name] - Get attributes of a port -get-recording-info [operator-id] [start-time] - Get tuple recording info -kill-app [app-id ...] - Kill an app -kill-container container-id [container-id ...] - Kill a container -list-containers - List containers -list-operators [pattern] - List operators -set-operator-property operator-name property-name property-value - Set a property of an operator -set-physical-operator-property operator-id property-name property-value - Set a property of an operator -show-logical-plan [options] [jar-file/app-package-file] [class-name] - Show logical plan of an app class - Options: - -exactMatch Only consider exact match - for app name - -ignorepom Do not run maven to find - the dependency - -libjars <comma separated list of jars> Specify comma separated - jar/resource files to - include in the classpath. -show-physical-plan - Show physical plan -shutdown-app [app-id ...] - Shutdown an app -start-recording operator-id [port-name] [num-windows] - Start recording -stop-recording operator-id [port-name] - Stop recording -wait timeout - Wait for completion of current application -``` - -### Commands when changing the logical plan - -``` -COMMANDS WHEN CHANGING LOGICAL PLAN (via begin-logical-plan-change): - -abort - Abort the plan change -add-stream-sink stream-name to-operator-name to-port-name - Add a sink to an existing stream -create-operator operator-name class-name - Create an operator -create-stream stream-name from-operator-name from-port-name to-operator-name to-port-name - Create a stream -help [command] - Show help -remove-operator operator-name - Remove an operator -remove-stream stream-name - Remove a stream -set-operator-attribute operator-name attr-name attr-value - Set an attribute of an operator -set-operator-property operator-name property-name property-value - Set a property of an operator -set-port-attribute operator-name port-name attr-name attr-value - Set an attribute of a port -set-stream-attribute stream-name attr-name attr-value - Set an attribute of a stream -show-queue - Show the queue of the plan change -submit - Submit the plan change -``` - - - -## Examples - -An example of defining a custom macro. The macro updates a running application by inserting a new operator. It takes three parameters and executes a logical plan changes. - -``` -dt> begin-macro add-console-output -macro> begin-logical-plan-change -macro> create-operator $1 com.datatorrent.lib.io.ConsoleOutputOperator -macro> create-stream stream_$1 $2 $3 $1 in -macro> submit -``` - - -Then execute the `add-console-output` macro like this - -``` -dt> add-console-output xyz opername portname -``` - -This macro then expands to run the following command - -``` -begin-logical-plan-change -create-operator xyz com.datatorrent.lib.io.ConsoleOutputOperator -create-stream stream_xyz opername portname xyz in -submit -``` - - -*Note*: To perform runtime logical plan changes, like ability to add new operators, -they must be part of the jar files that were deployed at application launch time. http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/AppConfig/ApplicationConfigurationPackages.html-image00.png ---------------------------------------------------------------------- diff --git a/images/AppConfig/ApplicationConfigurationPackages.html-image00.png b/images/AppConfig/ApplicationConfigurationPackages.html-image00.png deleted file mode 100644 index 30ad3e4..0000000 Binary files a/images/AppConfig/ApplicationConfigurationPackages.html-image00.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/AppConfig/ApplicationConfigurationPackages.html-image01.png ---------------------------------------------------------------------- diff --git a/images/AppConfig/ApplicationConfigurationPackages.html-image01.png b/images/AppConfig/ApplicationConfigurationPackages.html-image01.png deleted file mode 100644 index 5b3623d..0000000 Binary files a/images/AppConfig/ApplicationConfigurationPackages.html-image01.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/AppConfig/ApplicationConfigurationPackages.html-image02.png ---------------------------------------------------------------------- diff --git a/images/AppConfig/ApplicationConfigurationPackages.html-image02.png b/images/AppConfig/ApplicationConfigurationPackages.html-image02.png deleted file mode 100644 index 65a8aee..0000000 Binary files a/images/AppConfig/ApplicationConfigurationPackages.html-image02.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/AppPackage/ApplicationPackages.html-image00.png ---------------------------------------------------------------------- diff --git a/images/AppPackage/ApplicationPackages.html-image00.png b/images/AppPackage/ApplicationPackages.html-image00.png deleted file mode 100644 index 5b3623d..0000000 Binary files a/images/AppPackage/ApplicationPackages.html-image00.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/AppPackage/ApplicationPackages.html-image01.png ---------------------------------------------------------------------- diff --git a/images/AppPackage/ApplicationPackages.html-image01.png b/images/AppPackage/ApplicationPackages.html-image01.png deleted file mode 100644 index f707d81..0000000 Binary files a/images/AppPackage/ApplicationPackages.html-image01.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/AppPackage/ApplicationPackages.html-image02.png ---------------------------------------------------------------------- diff --git a/images/AppPackage/ApplicationPackages.html-image02.png b/images/AppPackage/ApplicationPackages.html-image02.png deleted file mode 100644 index 49de0e9..0000000 Binary files a/images/AppPackage/ApplicationPackages.html-image02.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/MalharOperatorOverview.png ---------------------------------------------------------------------- diff --git a/images/MalharOperatorOverview.png b/images/MalharOperatorOverview.png deleted file mode 100644 index 40bee4a..0000000 Binary files a/images/MalharOperatorOverview.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/apex_logo.png ---------------------------------------------------------------------- diff --git a/images/apex_logo.png b/images/apex_logo.png deleted file mode 100644 index baa25ca..0000000 Binary files a/images/apex_logo.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/application_development/ApplicationDeveloperGuide.html-image00.png ---------------------------------------------------------------------- diff --git a/images/application_development/ApplicationDeveloperGuide.html-image00.png b/images/application_development/ApplicationDeveloperGuide.html-image00.png deleted file mode 100644 index 87ebce4..0000000 Binary files a/images/application_development/ApplicationDeveloperGuide.html-image00.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/application_development/ApplicationDeveloperGuide.html-image01.png ---------------------------------------------------------------------- diff --git a/images/application_development/ApplicationDeveloperGuide.html-image01.png b/images/application_development/ApplicationDeveloperGuide.html-image01.png deleted file mode 100644 index 4cdee33..0000000 Binary files a/images/application_development/ApplicationDeveloperGuide.html-image01.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/application_development/ApplicationDeveloperGuide.html-image02.png ---------------------------------------------------------------------- diff --git a/images/application_development/ApplicationDeveloperGuide.html-image02.png b/images/application_development/ApplicationDeveloperGuide.html-image02.png deleted file mode 100644 index 5bf041c..0000000 Binary files a/images/application_development/ApplicationDeveloperGuide.html-image02.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/application_development/ApplicationDeveloperGuide.html-image03.png ---------------------------------------------------------------------- diff --git a/images/application_development/ApplicationDeveloperGuide.html-image03.png b/images/application_development/ApplicationDeveloperGuide.html-image03.png deleted file mode 100644 index e00bba5..0000000 Binary files a/images/application_development/ApplicationDeveloperGuide.html-image03.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/application_development/ApplicationDeveloperGuide.html-image04.png ---------------------------------------------------------------------- diff --git a/images/application_development/ApplicationDeveloperGuide.html-image04.png b/images/application_development/ApplicationDeveloperGuide.html-image04.png deleted file mode 100644 index 8f62361..0000000 Binary files a/images/application_development/ApplicationDeveloperGuide.html-image04.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/application_development/ApplicationDeveloperGuide.html-image05.png ---------------------------------------------------------------------- diff --git a/images/application_development/ApplicationDeveloperGuide.html-image05.png b/images/application_development/ApplicationDeveloperGuide.html-image05.png deleted file mode 100644 index f9ea8d9..0000000 Binary files a/images/application_development/ApplicationDeveloperGuide.html-image05.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/application_development/ApplicationDeveloperGuide.html-image06.png ---------------------------------------------------------------------- diff --git a/images/application_development/ApplicationDeveloperGuide.html-image06.png b/images/application_development/ApplicationDeveloperGuide.html-image06.png deleted file mode 100644 index 346690c..0000000 Binary files a/images/application_development/ApplicationDeveloperGuide.html-image06.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/application_development/ApplicationDeveloperGuide.html-image07.png ---------------------------------------------------------------------- diff --git a/images/application_development/ApplicationDeveloperGuide.html-image07.png b/images/application_development/ApplicationDeveloperGuide.html-image07.png deleted file mode 100644 index e57a8eb..0000000 Binary files a/images/application_development/ApplicationDeveloperGuide.html-image07.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/application_development/ApplicationDeveloperGuide.html-image08.png ---------------------------------------------------------------------- diff --git a/images/application_development/ApplicationDeveloperGuide.html-image08.png b/images/application_development/ApplicationDeveloperGuide.html-image08.png deleted file mode 100644 index a363f94..0000000 Binary files a/images/application_development/ApplicationDeveloperGuide.html-image08.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/application_development/ApplicationDeveloperGuide.html-image09.png ---------------------------------------------------------------------- diff --git a/images/application_development/ApplicationDeveloperGuide.html-image09.png b/images/application_development/ApplicationDeveloperGuide.html-image09.png deleted file mode 100644 index 8a0252b..0000000 Binary files a/images/application_development/ApplicationDeveloperGuide.html-image09.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/autometrics/adt.png ---------------------------------------------------------------------- diff --git a/images/autometrics/adt.png b/images/autometrics/adt.png deleted file mode 100644 index 187bbd4..0000000 Binary files a/images/autometrics/adt.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/autometrics/dashboard.png ---------------------------------------------------------------------- diff --git a/images/autometrics/dashboard.png b/images/autometrics/dashboard.png deleted file mode 100644 index c4ebb39..0000000 Binary files a/images/autometrics/dashboard.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/autometrics/visualize.png ---------------------------------------------------------------------- diff --git a/images/autometrics/visualize.png b/images/autometrics/visualize.png deleted file mode 100644 index fb2e780..0000000 Binary files a/images/autometrics/visualize.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/operator/image00.png ---------------------------------------------------------------------- diff --git a/images/operator/image00.png b/images/operator/image00.png deleted file mode 100644 index 14588aa..0000000 Binary files a/images/operator/image00.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/operator/image01.png ---------------------------------------------------------------------- diff --git a/images/operator/image01.png b/images/operator/image01.png deleted file mode 100644 index 626c6b5..0000000 Binary files a/images/operator/image01.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/operator/image02.png ---------------------------------------------------------------------- diff --git a/images/operator/image02.png b/images/operator/image02.png deleted file mode 100644 index 2be9433..0000000 Binary files a/images/operator/image02.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/operator/image03.png ---------------------------------------------------------------------- diff --git a/images/operator/image03.png b/images/operator/image03.png deleted file mode 100644 index 67802d8..0000000 Binary files a/images/operator/image03.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/operator/image04.png ---------------------------------------------------------------------- diff --git a/images/operator/image04.png b/images/operator/image04.png deleted file mode 100644 index 58d99d9..0000000 Binary files a/images/operator/image04.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/images/operator/image05.png ---------------------------------------------------------------------- diff --git a/images/operator/image05.png b/images/operator/image05.png deleted file mode 100644 index 9ac6f21..0000000 Binary files a/images/operator/image05.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/44f220fd/operator_development.md ---------------------------------------------------------------------- diff --git a/operator_development.md b/operator_development.md deleted file mode 100644 index f502725..0000000 --- a/operator_development.md +++ /dev/null @@ -1,449 +0,0 @@ -Operator Development Guide -========================== - -Operators are basic building blocks of an application built to run on -Apache Apex platform. An application may consist of one or more -operators each of which define some logical operation to be done on the -tuples arriving at the operator. These operators are connected together -using streams forming a Directed Acyclic Graph (DAG). In other words, a streaming -application is represented by a DAG that consists of operations (called operators) and -data flow (called streams). - -In this document we will discuss details on how an operator works and -its internals. This document is intended to serve the following purposes - -1. **[Apache Apex Operators](#apex_operators)** - Introduction to operator terminology and concepts. -2. **[Writing Custom Operators](#writing_custom_operators)** - Designing, coding and testing new operators from scratch. Includes code examples. -3. **[Operator Reference](#operator_reference)** - Details of operator internals, lifecycle, and best practices and optimizations. - -* * * * * - -Apache Apex Operators <a name="apex_operators"></a> -========================================== - -Operators - âWhatâ in a nutshell --------------------------------- - -Operators are independent units of logical operations which can -contribute in executing the business logic of a use case. For example, -in an ETL workflow, a filtering operation can be represented by a single -operator. This filtering operator will be responsible for doing just one -task in the ETL pipeline, i.e. filter incoming tuples. Operators do not -impose any restrictions on what can or cannot be done as part of a -operator. An operator may as well contain the entire business logic. -However, it is recommended, that the operators are light weight -independent tasks, in -order to take advantage of the distributed framework that Apache Apex -provides. The structure of a streaming application shares resemblance -with the way CPU pipelining works. CPU pipelining breaks down the -computation engine into different stages viz. instruction fetch, -instruction decode, etc. so that each of them can perform their task on -different instructions -parallely. Similarly, -Apache Apex APIs allow the user to break down their tasks into different -stages so that all of the tasks can be executed on different tuples -parallely. - - - -Operators - âHowâ in a nutshell -------------------------------- - -An Apache Apex application runs as a YARN application. Hence, each of -the operators that the application DAG contains, runs in one of the -containers provisioned by YARN. Further, Apache Apex exposes APIs to -allow the user to request bundling multiple operators in a single node, -a single container or even a single thread. We shall look at these calls -in the reference sections [cite reference sections]. For now, consider -an operator as some piece of code that runs on some machine of a YARN -cluster. - -Types of Operators ------------------- - -An operator works on one tuple at a time. These tuples may be supplied -by other operators in the application or by external sources, -such as a database or a message bus. Similarly, after the tuples are -processed, these may be passed on to other operators, or stored into an external system. -Therea are 3 type of operators based on function: - -1. **Input Adapter** - This is one of the starting points in - the application DAG and is responsible for getting tuples from an - external system. At the same time, such data may also be generated - by the operator itself, without interacting with the outside - world. These input tuples will form the initial universe of - data that the application works on. -2. **Generic Operator** - This type of operator accepts input tuples from - the previous operators and passes them on to the following operators - in the DAG. -3. **Output Adapter** - This is one of the ending points in the application - DAG and is responsible for writing the data out to some external - system. - -Note: There can be multiple operators of all types in an application -DAG. - -Operators Position in a DAG ------------------------------------ - -We may refer to operators depending on their position with respect to -one another. For any operator opr (see image below), there are two types of operators. - -1. **Upstream operators** - These are the operators from which there is a - directed path to opr in the application DAG. -2. **Downstream operators** - These are the operators to which there is a - directed path from opr in the application DAG. - -Note that there are no cycles formed in the application DAG. - - - -Ports ------ - -Operators in a DAG are connected together via directed flows -called streams. Each stream has end-points located on the operators -called ports. Therea are 2 types of ports. - -1. **Input Port** - This is a port through which an operator accepts input - tuples from an upstream operator. -2. **Output port** - This is a port through which an operator passes on the - processed data to downstream operators. - -Looking at the number of input ports, an Input Adapter is an operator -with no input ports, a Generic operator has both input and output ports, -while an Output Adapter has no output ports. At the same time, note that -an operator may act as an Input Adapter while at the same time have an -input port. In such cases, the operator is getting data from two -different sources, viz. the input stream from the input port and an -external source. - - - -* * * * * - -How Operator Works ----------------------- - -An operator passes through various stages during its lifetime. Each -stage is an API call that the Streaming Application Master makes for an -operator.  The following figure illustrates the stages through which an -operator passes. - - - -- The _setup()_ call initializes the operator and prepares itself to - start processing tuples. -- The _beginWindow()_ call marks the beginning of an application window - and allows for any processing to be done before a window starts. -- The _process()_ call belongs to the _InputPort_ and gets triggered when - any tuple arrives at the Input port of the operator. This call is - specific only to Generic and Output adapters, since Input Adapters - do not have an input port. This is made for all the tuples at the - input port until the end window marker tuple is received on the - input port. -- The _emitTuples()_ is the counterpart of _process()_ call for Input - Adapters. - This call is used by Input adapters to emit any tuples that are - fetched from the external systems, or generated by the operator. - This method is called continuously until the pre-configured window - time is elapsed, at which the end window marker tuple is sent out on - the output port. -- The _endWindow()_ call marks the end of the window and allows for any - processing to be done after the window ends. -- The _teardown()_ call is used for gracefully shutting down the - operator and releasing any resources held by the operator. - -Developing Custom Operators <a name="writing_custom_operators"></a> -==================================== - -About this tutorial -------------------- - -This tutorial will guide the user towards developing a operator from -scratch. It includes all aspects of writing an operator including -design, code and unit testing. - -Introduction ------------- - -In this tutorial, we will design and write, from scratch, an operator -called Word Count. This operator will accept tuples of type String, -count the number of occurrences for each word appearing in the tuple and -send out the updated counts for all the words encountered in the tuple. -Further, the operator will also accept a file path on HDFS which will -contain the stop-words which need to be ignored when counting -occurrences. - -Design ------- - -Design of the operator must be finalized before starting to write an -operator. Many aspects including the functionality, the data sources, -the types involved etc. need to be first finalized before writing the -operator. Let us dive into each of these while considering the Word -Count operator. - -### Functionality - -We can define the scope of operator functionality using the following -tasks: - -1. Parse the input tuple to identify the words in the tuple -2. Identify the stop-words in the tuple by looking up the stop-word - file as configured -3. For each non-stop-word in the tuple, count the occurrences in that - tuple and add it to a global counts - -Letâs consider an example. Suppose we have the following tuples flow -into the Word Count operator. - -1. _Humpty dumpty sat on a wall_ -2. _Humpty dumpty had a great fall_ - -Initially counts for all words is 0. Once the first tuple is processed, -the counts that must be emitted are: - -``` java -humpty - 1 -dumpty - 1 -sat - 1 -wall - 1 -``` - -Note that we are ignoring the stop-words, âonâ and âaâ in this case. -Also note that as a rule, weâll ignore the case of the words when -counting occurrences. - -Similarly, after the second tuple is processed, the counts that must be -emitted are: - -``` java -humpty - 2 -dumpty - 2 -great - 1 -fall - 1 -``` - -Again, we ignore the words _âhadâ_ and _âaâ_ since these are stop-words. - -Note that the most recent count for any word is correct count for that -word. In other words, any new output for a word, invalidated all the -previous counts for that word. - -### Inputs - -As seen from the example above, the following inputs are expected for -the operator: - -1. Input stream whose tuple type is String -2. Input HDFS file path, pointing to a file containing stop-words - -Only one input port is needed. The stop-word file will be small enough -to be read completely in a single read. In addition this will be a one -time activity for the lifetime of the operator. This does not need a -separate input port. - - - -### Outputs - -We can define the output for this operator in multiple ways. - -1. The operator may send out the set of counts for which the counts - have changed after processing each tuple. -2. Some applications might not need an update after every tuple, but - only after a certain time duration. - -Let us try and implement both these options depending on the -configuration. Let us define a boolean configuration parameter -_âsendPerTupleâ_. The value of this parameter will indicate whether the -updated counts for words need to be emitted after processing each -tuple (true) or after a certain time duration (false). - -The type of information the operator will be sending out on the output -port is the same for all the cases. This will be a _< key, value >_ pair, -where the key is the word while, the value is the latest count for that -word. This means we just need one output port on which this information -will go out. - - - -Configuration -------------- - -We have the following configuration parameters: - -1. _stopWordFilePath_ - This parameter will store the path to the stop - word file on HDFS as configured by the user. -2. _sendPerTuple_ - This parameter decides whether we send out the - updated counts after processing each tuple or at the end of a - window. When set to true, the operator will send out the updated - counts after each tuple, else it will send at the end of - each window. - -Code ----- - -The source code for the tutorial can be found here: - -[https://github.com/DataTorrent/examples/tree/master/tutorials/operatorTutorial](https://www.google.com/url?q=https://github.com/DataTorrent/examples/tree/master/tutorials/operatorTutorial&sa=D&usg=AFQjCNHAAgSpNprHJVvy9GSjdlD1uwU7jw) - - -Operator Reference <a name="operator_reference"></a> -==================================== - - -### The Operator Class - -The operator will exist physically as a class which implements the -Operator interface. This interface will require implementations for the -following method calls: - -- setup(OperatorContext context) -- beginWindow(long windowId) -- endWindow() -- tearDown() - -In order to simplify the creation of an operator, Apache Apex -library also provides a base class âBaseOperatorâ which has empty -implementations for these methods. Please refer to the [Apex Operators](#apex_operators) section and the -[Reference](#operator_reference) section for details on these. - -We extend the class âBaseOperatorâ to create our own operator -âWordCountOperatorâ. - -``` java -public class WordCountOperator extends BaseOperator -{ -} -``` - -### Class (Operator) properties - -We define the following class variables: - -- _sendPerTuple_ - Configures the output frequency from the operator -``` java -private boolean sendPerTuple = true; // default -``` -- _stopWordFilePath_ - Stores the path to the stop words file on HDFS -``` java -private String stopWordFilePath; // no default -``` -- _stopWords_ - Stores the stop words read from the configured file -``` java -private transient String[] stopWords; -``` -- _globalCounts_ - A Map which stores the counts of all the words - encountered so far. Note that this variable is non transient, which - means that this variable is saved as part of the checkpoint and can be recovered in event of a crash. -``` java -private Map<String, Long> globalCounts; -``` -- _updatedCounts_ - A Map which stores the counts for only the most - recent tuple(s). sendPerTuple configuration determines whether to store the most recent or the recent - window worth of tuples. -``` java -private transient Map<String, Long> updatedCounts; -``` -- _input_ - The input port for the operator. The type of this input port - is String which means it will only accept tuples of type String. The - definition of an input port requires implementation of a method - called process(String tuple), which should have the processing logic - for the input tuple which  arrives at this input port. We delegate - this task to another method called processTuple(String tuple). This - helps in keeping the operator classes extensible by overriding the - processing logic for the input tuples. -``` java -public transient DefaultInputPort<String> input = new   -DefaultInputPort<String>() -{ -    @Override -    public void process(String tuple) -    { -     processTuple(tuple); -    } -}; -``` -- output - The output port for the operator. The type of this port is - Entry < String, Long >, which means the operator will emit < word, - count > pairs for the updated counts. -``` java -public transient DefaultOutputPort <Entry<String, Long>> output = new -DefaultOutputPort<Entry<String,Long>>(); -``` - -### The Constructor - -The constructor is the place where we initialize the non-transient data -structures, since -constructor is called just once per activation of an operator. With regards to Word Count operator, we initialize the globalCounts variable in the constructor. -``` java -globalCounts = Maps.newHashMap(); -``` -### Setup call - -The setup method is called only once during an operator lifetime and its purpose is to allow -the operator to set itself up for processing incoming streams. Transient objects in the operator are -not serialized and checkpointed. Hence, it is essential that such objects initialized in the setup call. -In case of operator failure, the operator will be redeployed (most likely on a different container). The setup method called by the Apache Apex engine allows the operator to prepare for execution in the new container. - -The following tasks are executed as part of the setup call: - -1. Read the stop-word list from HDFS and store it in the - stopWords array -2. Initialize updatedCounts variable. This will store the updated - counts for words in most recent tuples processed by the operator. - As a transient variable, the value will be lost when operator fails. - -### Begin Window call - -The begin window call signals the start of an application window. With -regards to Word Count Operator, we are expecting updated counts for the most recent window of -data if the sendPerTuple is set to false. Hence, we clear the updatedCounts variable in the begin window -call and start accumulating the counts till the end window call. - -### Process Tuple call - -The processTuple method is called by the process method of the input -port, input. This method defines the processing logic for the current -tuple that is received at the input port. As part of this method, we -identify the words in the current tuple and update the globalCounts and -the updatedCounts variables. In addition, if the sendPerTuple variable -is set to true, we also emit the words and corresponding counts in -updatedCounts to the output port. Note that in this case (sendPerTuple = -true), we clear the updatedCounts variable in every call to -processTuple. - -### End Window call - -This call signals the end of an application window. With regards to Word -Count Operator, we emit the updatedCounts to the output port if the -sendPerTuple flag is set to false. - -### Teardown call - -This method allows the operator to gracefully shut down itself after -releasing the resources that it has acquired. With regards to our operator, -we call the shutDown method which shuts down the operator along with any -downstream operators. - -Testing your Operator ---------------------- - -As part of testing our operator, we test the following two facets: - -1. Test output of the operator after processing a single tuple -2. Test output of the operator after processing of a window of tuples - -The unit tests for the WordCount operator are available in the class -WordCountOperatorTest.java. We simulate the behavior of the engine by -using the test utilities provided by Apache Apex libraries. We simulate -the setup, beginWindow, process method of the input port and -endWindow calls and compare the output received at the simulated output -ports. - -1. Invoke constructor; non-transients initialized. -2. Copy state from checkpoint -- initialized values from step 1 are -replaced.
