Repository: apex-core
Updated Branches:
  refs/heads/master 01eb7926d -> ca1a375f9


APEXCORE-660 Added documentation for custom control tuple support


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/ca1a375f
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/ca1a375f
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/ca1a375f

Branch: refs/heads/master
Commit: ca1a375f983be4876e85719c85c8e06dab129593
Parents: 01eb792
Author: bhupeshchawda <[email protected]>
Authored: Mon Mar 6 18:05:07 2017 +0530
Committer: bhupeshchawda <[email protected]>
Committed: Thu Apr 13 20:38:55 2017 +0530

----------------------------------------------------------------------
 docs/control_tuples.md       | 207 ++++++++++++++++++++++++++++++++++++++
 docs/operator_development.md |  18 +++-
 mkdocs.yml                   |   1 +
 3 files changed, 221 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/ca1a375f/docs/control_tuples.md
----------------------------------------------------------------------
diff --git a/docs/control_tuples.md b/docs/control_tuples.md
new file mode 100644
index 0000000..f1ac518
--- /dev/null
+++ b/docs/control_tuples.md
@@ -0,0 +1,207 @@
+# User Defined Control Tuples
+
+## Introduction
+
+Custom control tuple support in Apache Apex gives the user the capability to 
insert user defined control tuples in the data flow. For analogy, the engine 
already supports a few pre-defined control tuples like BEGIN_WINDOW, 
END_WINDOW, etc. Until now, we did not have the support for applications to 
insert their own control tuples.
+
+## Terminology
+All discussion in this document is related to Control Tuples generated by user 
defined logic. The document may refer to these tuples as *Control Tuples*, 
*User Defined Control Tuples* or *Custom Control Tuples* interchangeably.
+
+### Definition
+A user defined control tuple could be any user defined object which implements 
a ControlTuple interface.
+
+See [Delivery Semantics](#delivery-semantics) for details on DeliveryType
+```
+public interface ControlTuple
+{
+  DeliveryType getDeliveryType();
+
+  enum DeliveryType
+  {
+    IMMEDIATE,
+    END_WINDOW
+  }
+}
+
+```
+
+Example user defined control tuple:
+```
+public class TestControlTuple implements ControlTuple
+{
+  public long data;
+  public boolean immediate;
+
+  // For Kryo
+  public TestControlTuple()
+  {
+    data = 0;
+  }
+
+  // Constructor
+  public TestControlTuple(long data, boolean immediate)
+  {
+    this.data = data;
+    this.immediate = immediate;
+  }
+
+  @Override
+  public DeliveryType getDeliveryType()
+  {
+    if (immediate) {
+      return DeliveryType.IMMEDIATE;
+    } else {
+      return DeliveryType.END_WINDOW;
+    }
+  }
+}
+```
+
+
+## Use cases
+A control tuple may be used in an application to trigger some sort of action 
in a downstream operator. For example, the source operator might want to notify 
the last operator that it has emitted all the data in a file and that the file 
has now ended. Let's call this an *End-Of-File* control tuple. Once the last 
operator gets the *End-Of-File* tuple, it would, say, close the destination 
file it was writing and create a new file.
+
+More use cases which were discussed during the requirements of this feature 
are as follows:
+
+1. **Batch support** - We need to tell all operators of the physical DAG when a
+batch starts and ends, so the operators can do whatever is needed upon
+the start or the end of a batch.
+2. **Watermark** - To support the concepts of event time windowing, the
+watermark control tuple is needed to identify late windows.
+3. **Changing operator properties** - We do have the support of changing
+operator properties on the fly, but with a custom control tuple, the
+command to change operator properties can be window aligned for all
+partitions and also across the DAG. In other words, the properties of *all* 
physical partitions can be aligned to a particular window. In case the behavior 
of the application needs to change, we may also be able to change properties of 
multiple logical operators aligned to a particular window.
+4. **Recording tuples** - Like changing operator properties, we do have this
+support now but only at the individual physical operator level, and without
+control of which window to record tuples for. With a custom control tuple,
+because a control tuple must belong to a window, all operators in the DAG
+can start (and stop) recording for the same windows.
+
+
+## Usage
+
+###  Generating a Control Tuple
+There is no restriction on which operator in the DAG can or can not generate a 
control tuple. The operator which needs to generate a control tuple should 
declare a port whose type is `ControlAwareDefaultOutputPort`; the user could 
simply call the `emitControl(ControlTuple t)` method on this port.
+
+Example: In the code snippet below, the `Generator` operator declares a 
`ControlAwareDefaultOutputPort` called `output` which can emit a data tuple as 
well as a control tuple.
+
+```
+public class Generator extends BaseOperator implements InputOperator
+{
+  private long data;
+  private long count;
+
+  public final transient ControlAwareDefaultOutputPort<Double> output =
+      new ControlAwareDefaultOutputPort<>();
+
+  @Override
+  public void emitTuples()
+  {
+    // Can emit a data tuple using output.emit()
+    output.emit(data++);
+    count++;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    // Can also emit a control tuple using output.emitControl()
+    output.emitControl(new TestControlTuple(count, immediate));
+  }
+}
+```
+
+**Note** - User defined control tuples and control aware ports can only be 
used in operators which use the apex-core dependency which has control tuple 
support, viz. 3.6.0 or above. Previous versions of apex-core would not be able 
to support an application which uses user defined control tuples or control 
aware ports and would crash at launch time.
+
+### Receiving a Control Tuple
+Any downstream operator which wants to receive a user defined control tuple, 
should declare an input port which is *Control Aware*. A 
`ControlAwareDefaultInputPort` would have the necessary capability to process a 
control tuple in addition to a regular data tuple.
+
+Example: Below code snippet illustrates the use of `processControl` method of 
`ControlAwareDefaultInputPort` to receive / handle user defined control tuples.
+
+```
+public final transient ControlAwareDefaultInputPort<Double> input =
+    new ControlAwareDefaultInputPort<Double>()
+{
+  // Process a data tuple
+  @Override
+  public void process(Double tuple)
+  {
+    output.emit(tuple);
+  }
+
+  // Process a control tuple
+  @Override
+  public boolean processControl(ControlTuple userControlTuple)
+  {
+    // process control tuple here
+    return false;
+    // indicates whether or not the engine
+    // should propagate the tuple automatically to downstream operators
+    // Discussed in later sections
+  }
+};
+
+```
+Note that the pre-defined control tuples like `BEGIN_WINDOW` and `END_WINDOW` 
would not be handled by the `processControl()` method since these used only by 
the engine and are not meant to be delivered to user logic in operators. Custom 
control tuples on the other hand are generated by the operators and need to be 
delivered to downstream operators.
+
+#### Return value of `processControl`
+
+Following are the semantics:
+
+1. true - Operator would handle propagation explicitly
+2. false - Operator would not handle propagation. Engine will automatically 
forward.
+
+See [Propagation of Control Tuples](#propagation-of-control-tuples) for more 
details
+
+### Serialization requirements
+A control tuple generated by some operator of the application needs to 
traverse the same path as that traversed by other data tuples transmitted by 
the application. For this reason, similar to the other data tuples, the control 
tuple needs to be Kryo serializable since the default serializer used by the 
platform is Kryo.
+
+
+## Propagation of Control Tuples
+
+A control tuple emitted by an operator can be propagated downstream 
automatically. This is in line with the automatic propagation of other 
pre-defined control tuples in the engine. However, some use cases require that 
the control tuple need not be propagated further in the DAG. We support this 
behavior for user defined control tuples.
+
+Once the control tuple is processed in the `processControl` method, a return 
value is expected by the engine. This return value indicates whether or not the 
operator wishes to handle the propagation of the control tuple or let the 
engine proceed with the default auto-propagation of the control tuple.
+
+The `processControl` method of the `ControlAwareDefaultInputPort` returns a 
boolean return value.
+
+```
+@Override
+public boolean processControl(ControlTuple userControlTuple)
+{
+  // process userControlTuple here
+  // return true if operator wants to propagate explicitly or block propagation
+  // return false if operator wants engine to propagate automatically
+}
+```
+
+### Non - *Control Aware* ports
+For operators without *Control Aware* ports, the platform will forward the 
control tuples to the downstream operators automatically. The application 
writer / user does not have to worry about handling a Control tuple which is 
generated upstream. Only operators with *Control Aware* ports would be 
delivered the control tuple via the `processControl` method.
+This also allows the existing operators to be backward compatible.
+
+## Delivery Semantics
+Delivery mechanism refer to the time wrt. the processing window when a control 
tuple is delivered to the operator. An operator has various call backs like 
`setup`, `beginWindow`, `endWindow`, etc.  
+
+### DeliveryType IMMEDIATE
+As the name implies, the control tuple is immediately delivered to the next  
downstream operator (if the operator is control aware), else it is forwarded to 
the next downstream operator.
+
+* **Case: Downstream is partitioned**  
+When the downstream is partitioned, the control tuple with *IMMEDIATE* 
delivery type would go to all the downstream partitions. This holds, 
irrespective of whether or not the control tuple was generated by the 
immediately upstream operator or even further upstream.
+
+* **Case: Upstream is partitioned**  
+When the upstream is partitioned and the control tuple is generated in any 
subset of the partitions the downstream operator would receive the control 
tuple immediately and would not wait till the end of the current window. In 
case the source for the control tuple was a single source further upstream and 
multiple copies were generated by the intermediate partitions, the duplicate 
copies of the control tuple would be filtered out at the downstream operator. 
Thus only unique control tuples are delivered to the downstream operator. 
Further, in case of *IMMEDIATE* delivery, the first instance of the control 
tuple is delivered to the operator and the duplicates filtered out.
+
+### DeliveryType END_WINDOW
+This delivery type only delivers the control tuple to the operator after all 
data tuples have been delivered to the operator. In the operator lifecycle, 
this would mean that the control tuples would be delivered just before the 
`endWindow` call.
+
+* **Case: Downstream is partitioned**  
+  When the downstream is partitioned, the control tuple emitted by the 
upstream would be broadcast to downstream operators and buffered in the 
downstream partitions until the end of the window and is delivered to the 
operator just before the `endWindow` call.
+
+* **Case: Upstream is partitioned**  
+  If the control tuples are generated in any subset of the partitions, then 
each control tuple is unique and are delivered to the downstream operator 
before the `endWindow` call. However, if the source for the control tuple is a 
source further upstream, then the downstream operator would filter out 
duplicates as and when each control tuple arrive at the operator, and finally 
all unique control tuples are delivered to the operator just before the 
`endWindow` call.
+## Assumptions
+All the user defined control tuples used in the application are cached in the 
memory of the operator for the duration of a window. For this reason, it is 
imperative that the size as well as the number of control tuples emitted within 
a window is small as compared to the number of data tuples.
+
+## JIRA
+* [APEXCORE-579](https://issues.apache.org/jira/browse/APEXCORE-579) points to 
the top level JIRA issue for control tuple support.

http://git-wip-us.apache.org/repos/asf/apex-core/blob/ca1a375f/docs/operator_development.md
----------------------------------------------------------------------
diff --git a/docs/operator_development.md b/docs/operator_development.md
index f3390c9..926fabc 100644
--- a/docs/operator_development.md
+++ b/docs/operator_development.md
@@ -15,6 +15,7 @@ its internals. This document is intended to serve the 
following purposes
 1.  **[Apache Apex Operators](#apex_operators)** - Introduction to operator 
terminology and concepts.
 2.  **[Writing Custom Operators](#writing_custom_operators)** - Designing, 
coding and testing new operators from scratch.  Includes code examples.
 3.  **[Operator Reference](#operator_reference)** - Details of operator 
internals, lifecycle, and best practices and optimizations.
+4.  **[Advanced Features](#advanced_features)** - Advanced features in 
operator development and its capabilities.
 
 * * * * *
 
@@ -64,8 +65,8 @@ Types of Operators
 An operator works on one tuple at a time. These tuples may be supplied
 by other operators in the application or by external sources,
 such as a database or a message bus. Similarly, after the tuples are
-processed, these may be passed on to other operators, or stored into an 
external system. 
-Therea are 3 type of operators based on function: 
+processed, these may be passed on to other operators, or stored into an 
external system.
+There are 3 type of operators based on function:
 
 1.  **Input Adapter** - This is one of the starting points in
     the application DAG and is responsible for getting tuples from an
@@ -384,9 +385,9 @@ globalCounts = Maps.newHashMap();
 ```
 ### Setup call
 
-The setup method is called only once during an operator lifetime and its 
purpose is to allow 
+The setup method is called only once during an operator lifetime and its 
purpose is to allow
 the operator to set itself up for processing incoming streams. Transient 
objects in the operator are
-not serialized and checkpointed. Hence, it is essential that such objects 
initialized in the setup call. 
+not serialized and checkpointed. Hence, it is essential that such objects 
initialized in the setup call.
 In case of operator failure, the operator will be redeployed (most likely on a 
different container). The setup method called by the Apache Apex engine allows 
the operator to prepare for execution in the new container.
 
 The following tasks are executed as part of the setup call:
@@ -399,7 +400,7 @@ The following tasks are executed as part of the setup call:
 
 ### Begin Window call
 
-The begin window call signals the start of an application window. With 
+The begin window call signals the start of an application window. With
 regards to Word Count Operator, we are expecting updated counts for the most 
recent window of
 data if the sendPerTuple is set to false. Hence, we clear the updatedCounts 
variable in the begin window
 call and start accumulating the counts till the end window call.
@@ -448,6 +449,13 @@ ports.
 2. Copy state from checkpoint -- initialized values from step 1 are
 replaced.
 
+Advanced Features <a name="advanced_features"></a>
+====================================
+
+Control Tuple Support
+---------------------------
+Operators now also have the capability to emit control tuples. These control 
tuples are different from the control tuples used by the engine like 
BEGIN_WINDOW and END_WINDOW tuples. Operators can create and emit their own 
control tuples which can be used to communicate to the down stream operators 
regarding some event. Examples of such events can be BEGIN_FILE, or END_FILE.
+More details can be found at [Control Tuples](../control_tuples)
 
 Malhar Operator Library
 ==========================

http://git-wip-us.apache.org/repos/asf/apex-core/blob/ca1a375f/mkdocs.yml
----------------------------------------------------------------------
diff --git a/mkdocs.yml b/mkdocs.yml
index c10f352..e582ec3 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -9,6 +9,7 @@ pages:
     - Packages: application_packages.md
     - Operators: operator_development.md
     - AutoMetric API: autometrics.md
+    - Custom Control Tuples: control_tuples.md
     - Best Practices: development_best_practices.md 
 - Operations:
     - Apex CLI : apex_cli.md

Reply via email to