Repository: apex-core
Updated Branches:
  refs/heads/master 8b55fbe91 -> 70ceae3db


Added best practices for application and operator development, also closes 
apache/apex-malhar#350


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/70ceae3d
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/70ceae3d
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/70ceae3d

Branch: refs/heads/master
Commit: 70ceae3db50f9183862748bf9c81cbe17d588ffa
Parents: 8b55fbe
Author: Pramod Immaneni <[email protected]>
Authored: Sat Sep 3 16:44:41 2016 -0700
Committer: Pramod Immaneni <[email protected]>
Committed: Tue Sep 6 16:21:24 2016 -0700

----------------------------------------------------------------------
 docs/development_best_practices.md | 132 ++++++++++++++++++++++++++++++++
 mkdocs.yml                         |   1 +
 2 files changed, 133 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/70ceae3d/docs/development_best_practices.md
----------------------------------------------------------------------
diff --git a/docs/development_best_practices.md 
b/docs/development_best_practices.md
new file mode 100644
index 0000000..815d431
--- /dev/null
+++ b/docs/development_best_practices.md
@@ -0,0 +1,132 @@
+# Development Best Practices
+
+This document describes the best practices to follow when developing operators 
and other application components such as partitoners, stream codecs etc on the 
Apache Apex platform.
+
+## Operators
+
+These are general guidelines for all operators that are covered in the current 
section. The subsequent sections talk about special considerations for input 
and output operators.
+
+* When writing a new operator to be used in an application, consider breaking 
it down into
+       * An abstract operator that encompasses the core functionality but 
leaves application specific schemas and logic to the implementation.
+       * An optional concrete operator also in the library that extends the 
abstract operator and provides commonly used schema types such as strings, 
byte[] or POJOs.
+* Follow these conventions for the life cycle methods:
+       * Do one time initialization of entities that apply for the entire 
lifetime of the operator in the **setup** method, e.g., factory 
initializations. Initializations in **setup** are done in the container where 
the operator is deployed. Allocating memory for fields in the constructor is 
not efficient as it would lead to extra garbage in memory for the following 
reason. The operator is instantiated on the client from where the application 
is launched, serialized and started one of the Hadoop nodes in a container. So 
the constructor is first called on the client and if it were to initialize any 
of the fields, that state would be saved during serialization. In the Hadoop 
container the operator is deserialized and started. This would invoke the 
constructor again, which will initialize the fields but their state will get 
overwritten by the serialized state and the initial values would become garbage 
in memory.
+       * Do one time initialization for live entities in **activate** method, 
e.g., opening connections to a database server or starting a thread for 
asynchronous operations. The **activate** method is called right before 
processing starts so it is a better place for these initializations than at 
**setup** which can lead to a delay before processing data from the live 
entity.  
+       * Perform periodic tasks based on processing time in application window 
boundaries.
+       * Perform initializations needed for each application window in 
**beginWindow**.
+       * Perform aggregations needed for each application window  in 
**endWindow**.
+       * Teardown of live entities (inverse of tasks performed during 
activate) should be in the **deactivate** method.
+       * Teardown of lifetime entities (those initialized in setup method) 
should happen in the **teardown** method.
+       * If the operator implementation is not finalized mark it with the 
**@Evolving** annotation.
+* If the operator needs to perform operations based on event time of the 
individual tuples and not the processing time, extend and use the 
**WindowedOperator**. Refer to documentation of that operator for details on 
how to use it.
+* If an operator needs to do some work when it is not receiving any input, it 
should implement **IdleTimeHandler** interface. This interface contains 
**handleIdleTime** method which will be called whenever the platform isn’t 
doing anything else and the operator can do the work in this method. If for any 
reason the operator does not have any work to do when this method is called, it 
should sleep for a small amount of time such as that specified by the 
**SPIN\_MILLIS** attribute so that it does not cause a busy wait when called 
repeatedly by the platform. Also, the method should not block and return in a 
reasonable amount of time that is less than the streaming window size (which is 
500ms by default).
+* Often operators have customizable parameters such as information about 
locations of external systems or parameters that modify the behavior of the 
operator. Users should be able to specify these easily without having to change 
source code. This can be done by making them properties of the operator because 
they can then be initialized from external properties files.
+       * Where possible default values should be provided for the properties 
in the source code.
+       * Validation rules should be specified for the properties using javax 
constraint validations that check whether the values specified for the 
properties are in the correct format, range or other operator requirements. 
Required properties should have at least a **@NotNull** validation specifying 
that they have to be specified by the user.
+
+### Checkpointing
+
+Checkpointing is a process of snapshotting the state of an operator and saving 
it so that in case of failure the state can be used to restore the operator to 
a prior state and continue processing. It is automatically performed by the 
platform at a configurable interval. All operators in the application are 
checkpointed in a distributed fashion, thus allowing the entire state of the 
application to be saved and available for recovery if needed. Here are some 
things to remember when it comes to checkpointing:
+
+* The process of checkpointing involves snapshotting the state by serializing 
the operator and saving it to a store. This is done using a **StorageAgent**. 
By default a *StorageAgent* is already provided by the platform and it is 
called **AsyncFSStorageAgent**. It serializes the operator using Kryo and saves 
the serialized state asynchronously to a filesystem such as HDFS. There are 
other implementations of *StorageAgent* available such as 
**GeodeKeyValueStorageAgent** that stores the serialized state in Geode which 
is an in-memory replicated data grid.
+* All variables in the operator marked neither transient nor final are saved 
so any variables in the operator that are not part of the state should be 
marked transient. Specifically any variables like connection objects, i/o 
streams, ports are transient, because they need to be setup again on failure 
recovery.
+* If the operator does not keep any state between windows, mark it with the 
**@Stateless** annotation. This results in efficiencies during checkpointing 
and recovery. The operator will not be checkpointed and is always restored to 
the initial state
+* The checkpoint interval can be set using the **CHECKPOINT\_WINDOW\_COUNT** 
attribute which specifies the interval in terms of number of streaming windows.
+* If the correct functioning of the operator requires the **endWindow** method 
be called before checkpointing can happen, then the checkpoint interval should 
align with application window interval i.e., it should be a multiple of 
application window interval. In this case the operator should be marked with 
**OperatorAnnotation** and **checkpointableWithinAppWindow** set to false. If 
the window intervals are configured by the user and they don’t align, it will 
result in a DAG validation error and application won’t launch.
+* In some cases the operator state related to a piece of data needs to be 
purged once that data is no longer required by the application, otherwise the 
state will continue to build up indefinitely. The platform provides a way to 
let the operator know about this using a callback listener called 
**CheckpointNotificationListener**. This listener has a callback method called 
**committed**, which is called by the platform from time to time with a window 
id that has been processed successfully by all the operators in the DAG and 
hence is no longer needed. The operator can delete all the state corresponding 
to window ids less than or equal to the provided window id.
+* Sometimes operators need to perform some tasks just before checkpointing. 
For example, filesystem operators may want to flush the files just before 
checkpoint so they can be sure that all pending data is written to disk and no 
data is lost if there is an operator failure just after the checkpoint and the 
operator restarts from the checkpoint. To do this the operator would implement 
the same *CheckpointNotificationListener* interface and implement the 
**beforeCheckpoint** method where it can do these tasks.
+* If the operator is going to have a large state, checkpointing the entire 
state each time becomes unviable. Furthermore, the amount of memory needed to 
hold the state could be larger than the amount of physical memory available. In 
these cases the operator should checkpoint the state incrementally and also 
manage the memory for the state more efficiently. The platform provides a 
utiltiy called **ManagedState** that uses a combination of in memory and disk 
cache to efficiently store and retrieve data in a performant, fault tolerant 
way and also checkpoint it in an incremental fashion. There are operators in 
the platform that use *ManagedState* and can be used as a reference on how to 
use this utility such as Dedup or Join operators.
+
+## Input Operators
+
+Input operators have additional requirements:
+
+* The **emitTuples** method implemented by the operator, is called by the 
platform, to give the operator an opportunity to emit some data. This method is 
always called within a window boundary but can be called multiple times within 
the same window. There are some important guidelines on how to implement this 
method:
+       * This should not be a blocking method and should return in a 
reasonable time that is less than the streaming window size (which is 500ms by 
default). This also applies to other callback methods called by the platform 
such as *beginWindow*, *endWindow* etc., but is more important here since this 
method will be called continuously by the platform.
+       * If the operator needs to interact with external systems to obtain 
data and this can potentially take a long time, then this should be performed 
asynchronously in a different thread. Refer to the threading section below for 
the guidelines when using threading.
+       * In each invocation, the method can emit any number of data tuples.
+
+### Idempotence
+
+Many applications write data to external systems using output operators. To 
ensure that data is present exactly once in the external system even in a 
failure recovery scenario, the output operators expect the replayed windows 
during recovery contain the same data as before the failure. This is called 
idempotency. Since operators within the DAG are merely responding to input data 
provided to them by the upstream operators and the input operator has no 
upstream operator, the responsibility of idempotent replay falls on the input 
operators.
+
+* For idempotent replay of data, the operator needs to store some 
meta-information for every window that would allow it to identify what data was 
sent in that window. This is called the idempotent state.
+       * If the external source of the input operator allows replayability, 
this could be information such as offset of last piece of data in the window, 
an identifier of the last piece of data itself or number of data tuples sent.
+       * However if the external source does not allow replayability from an 
operator specified point, then the entire data sent within the window may need 
to be persisted by the operator.
+* The platform provides a utility called *WindowDataManager* to allow 
operators to save and retrieve idempotent state every window. Operators should 
use this to implement idempotency.
+
+## Output Operators
+
+Output operators typically connect to external storage systems such as 
filesystems, databases or key value stores to store data.
+
+* In some situations, the external systems may not be functioning in a 
reliable fashion. They may be having prolonged outages or performance problems. 
If the operator is being designed to work in such environments, it needs to be 
able to to handle these problems gracefully and not block the DAG or fail. In 
these scenarios the operator should cache the data into a local store such as 
HDFS and interact with external systems in a separate thread so as to not have 
problems in the operator lifecycle thread. This pattern is called the 
**Reconciler** pattern and there are operators that implement this pattern 
available in the library for reference.
+
+### End-to-End Exactly Once
+
+When output operators store data in external systems, it is important that 
they do not lose data or write duplicate data when there is a failure event and 
the DAG recovers from that failure. In failure recovery, the windows from the 
previous checkpoint are replayed and the operator receives this data again. The 
operator should ensure that it does not write this data again. Operator 
developers should figure out how to do this specifically for the operators they 
are developing depending on the logic of the operators. Below are examples of 
how a couple of existing output operators do this for reference.
+
+* File output operator that writes data to files keeps track of the file 
lengths in the state. These lengths are checkpointed and restored on failure 
recovery. On restart, the operator truncates the file to the length equal to 
the length in the recovered state. This makes the data in the file same as it 
was at the time of checkpoint before the failure. The operator now writes the 
replayed data from the checkpoint in regular fashion as any other data. This 
ensures no data is lost or duplicated in the file.
+* The JDBC output operator that writes data to a database table writes the 
data in a window in a single transaction. It also writes the current window id 
into a meta table along with the data as part of the same transaction. It 
commits the transaction at the end of the window. When there is an operator 
failure before the final commit, the state of the database is that it contains 
the data from the previous fully processed window and its window id since the 
current window transaction isn’t yet committed. On recovery, the operator 
reads this window id back from the meta table. It ignores all the replayed 
windows whose window id is less than or equal to the recovered window id and 
thus ensures that it does not duplicate data already present in the database. 
It starts writing data normally again when window id of data becomes greater 
than recovered window thus ensuring no data is lost.
+
+## Partitioning
+
+Partitioning allows an operation to be scaled to handle more pieces of data 
than before but with a similar SLA. This is done by creating multiple instances 
of an operator and distributing the data among them. Input operators can also 
be partitioned to stream more pieces of data into the application. The platform 
provides a lot of flexibility and options for partitioning. Partitioning can 
happen once at startup or can be dynamically changed anytime while the 
application is running, and it can be done in a stateless or stateful way by 
distributing state from the old partitions to new partitions.
+
+In the platform, the responsibility for partitioning is shared among different 
entities. These are:
+ 
+1. A **partitioner** that specifies *how* to partition the operator, 
specifically it takes an old set of partitions and creates a new set of 
partitions. At the start of the application the old set has one partition and 
the partitioner can return more than one partitions to start the application 
with multiple partitions. The partitioner can have any custom JAVA logic to 
determine the number of new partitions, set their initial state as a brand new 
state or derive it from the state of the old partitions. It also specifies how 
the data gets distributed among the new partitions. The new set doesn't have to 
contain only new partitions, it can carry over some old partitions if desired.
+2. An optional **statistics (stats) listener** that specifies *when* to 
partition. The reason it is optional is that it is needed only when dynamic 
partitioning is needed. With the stats listener, the stats can be used to 
determine when to partition.
+3. In some cases the *operator* itself should be aware of partitioning and 
would need to provide supporting code.
+       * In case of input operators each partition should have a property or a 
set of properties that allow it to distinguish itself from the other partitions 
and fetch unique data.
+4. When an operator that was originally a single instance is split into 
multiple partitions with each partition working on a subset of data, the 
results of the partitions may need to be combined together to compute the final 
result. The combining logic would depend on the logic of the operator. This 
would be specified by the developer using a **Unifier**, which is deployed as 
another operator by the platform. If no *Unifier* is specified, the platform 
inserts a **default unifier** that merges the results of the multiple partition 
streams into a single stream. Each output port can have a different *Unifier* 
and this is specified by returning the corresponding *Unifier* in the 
**getUnifier** method of the output port. The operator developer should provide 
a custom *Unifier* wherever applicable.
+5. The Apex *engine* that brings everything together and effects the 
partitioning.
+
+Since partitioning is critical for scalability of applications, operators must 
support it. There should be a strong reason for an operator to not support 
partitioning, such as, the logic performed by the operator not lending itself 
to parallelism. In order to support partitioning, an operator developer, apart 
from developing the functionality of the operator, may also need to provide a 
partitioner, stats listener and supporting code in the operator as described in 
the steps above. The next sections delve into this. 
+
+### Out of the box partitioning
+
+The platform comes with some built-in partitioning utilities that can be used 
in certain scenarios.
+
+* **StatelessPartitioner** provides a default partitioner, that can be used 
for an operator in certain conditions. If the operator satisfies these 
conditions, the partitioner can be specified for the operator with a simple 
setting and no other partitioning code is needed. The conditions are:
+       * No dynamic partitioning is needed, see next point about dynamic 
partitioning. 
+       * There is no distinct initial state for the partitions, i.e., all 
partitions start with the same initial state submitted during application 
launch.
+       
+       Typically input or output operators do not fall into this category, 
although there are some exceptions. This partitioner is mainly used with 
operators that are in the middle of the DAG, after the input and before the 
output operators. When used with non-input operators, only the data for the 
first declared input port is distributed among the different partitions. All 
other input ports are treated as broadcast and all partitions receive all the 
data for that port.
+
+* **StatelessThroughputBasedPartitioner** in Malhar provides a dynamic 
partitioner based on throughput thresholds. Similarly 
**StatelessLatencyBasedPartitioner** provides a latency based dynamic 
partitioner in RTS. If these partitioners can be used, then separate 
partitioning related code is not needed. The conditions under which these can 
be used are:
+       * There is no distinct initial state for the partitions.
+       * There is no state being carried over by the operator from one window 
to the next i.e., operator is stateless.
+
+### Custom partitioning
+
+In many cases, operators don’t satisfy the above conditions and a built-in 
partitioner cannot be used. Custom partitioning code needs to be written by the 
operator developer. Below are guidelines for it.
+
+* Since the operator developer is providing a *partitioner* for the operator, 
the partitioning code should be added to the operator itself by making the 
operator implement the Partitioner interface and implementing the required 
methods, rather than creating a separate partitioner. The advantage is the user 
of the operator does not have to explicitly figure out the partitioner and set 
it for the operator but still has the option to override this built-in 
partitioner with a different one.
+* The *partitioner* is responsible for setting the initial state of the new 
partitions, whether it is at the start of the application or when partitioning 
is happening while the application is running as in the dynamic partitioning 
case. In the dynamic partitioning scenario, the partitioner needs to take the 
state from the old partitions and distribute it among the new partitions. It is 
important to note that apart from the checkpointed state the partitioner also 
needs to distribute idempotent state.
+* The *partitioner* interface has two methods, **definePartitions** and 
**partitioned**. The method *definePartitons* is first called to determine the 
new partitions, and if enough resources are available on the cluster, the 
*partitioned* method is called passing in the new partitions. This happens both 
during initial partitioning and dynamic partitioning. If resources are not 
available, partitioning is abandoned and existing partitions continue to run 
untouched. This means that any processing intensive operations should be 
deferred to the *partitioned* call instead of doing them in *definePartitions*, 
as they may not be needed if there are not enough resources available in the 
cluster.
+* The *partitioner*, along with creating the new partitions, should also 
specify how the data gets distributed across the new partitions. It should do 
this by specifying a mapping called **PartitionKeys** for each partition that 
maps the data to that partition. This mapping needs to be specified for every 
input port in the operator. If the *partitioner* wants to use the standard 
mapping it can use a utility method called 
**DefaultPartition.assignPartitionKeys**.
+* When the partitioner is scaling the operator up to more partitions, try to 
reuse the existing partitions and create new partitions to augment the current 
set. The reuse can be achieved by the partitioner returning the current 
partitions unchanged. This will result in the current partitions continuing to 
run untouched.
+* In case of dynamic partitioning, as mentioned earlier, a stats listener is 
also needed to determine when to re-partition. Like the *Partitioner* 
interface, the operator can also implement the *StatsListener* interface to 
provide a stats listener implementation that will be automatically used.
+* The *StatsListener* has access to all operator statistics to make its 
decision on partitioning. Apart from the statistics that the platform computes 
for the operators such as throughput, latency etc, operator developers can 
include their own business metrics by using the AutoMetric feature.
+* If the operator is not partitionable, mark it so with *OperatorAnnotation* 
and *partitionable* element set to false.
+
+### StreamCodecs
+
+A **StreamCodec** is used in partitioning to distribute the data tuples among 
the partitions. The *StreamCodec* computes an integer hashcode for a data tuple 
and this is used along with *PartitionKeys* mapping to determine which 
partition or partitions receive the data tuple. If a *StreamCodec* is not 
specified, then a default one is used by the platform which returns the JAVA 
hashcode of the tuple. 
+
+*StreamCodec* is also useful in another aspect of the application. It is used 
to serialize and deserialize the tuple to transfer it between operators. The 
default *StreamCodec* uses Kryo library for serialization. 
+
+The following guidelines are useful when considering a custom *StreamCodec*
+
+* A custom *StreamCodec* is needed if the tuples need to be distributed based 
on a criteria different from the hashcode of the tuple. If the correct working 
of an operator depends on the data from the upstream operator being distributed 
using a custom criteria such as being sticky on a “key” field within the 
tuple, then a custom *StreamCodec* should be provided by the operator 
developer. This codec can implement the custom criteria. The operator should 
also return this custom codec in the **getStreamCodec** method of the input 
port.
+* When implementing a custom *StreamCodec* for the purpose of using a 
different criteria to distribute the tuples, the codec can extend an existing 
*StreamCodec* and implement the hashcode method, so that the codec does not 
have to worry about the serialization and deserialization functionality. The 
Apex platform provides two pre-built *StreamCodec* implementations for this 
purpose, one is **KryoSerializableStreamCodec** that uses Kryo for 
serialization and another one **JavaSerializationStreamCodec** that uses JAVA 
serialization.
+* Different *StreamCodec* implementations can be used for different inputs in 
a stream with multiple inputs when different criteria of distributing the 
tuples is desired between the multiple inputs. 
+
+## Threads
+
+The operator lifecycle methods such as **setup**, **beginWindow**, 
**endWindow**, **process** in *InputPorts* are all called from a single 
operator lifecycle thread, by the platform, unbeknownst to the user. So the 
user does not have to worry about dealing with the issues arising from 
multi-threaded code. Use of separate threads in an operator is discouraged 
because in most cases the motivation for this is parallelism, but parallelism 
can already be achieved by using multiple partitions and furthermore mistakes 
can be made easily when writing multi-threaded code. When dealing with high 
volume and velocity data, the corner cases with incorrectly written 
multi-threaded code are encountered more easily and exposed. However, there are 
times when separate threads are needed, for example, when interacting with 
external systems the delay in retrieving or sending data can be large at times, 
blocking the operator and other DAG processing such as committed windows. In 
these cases the followin
 g guidelines must be followed strictly.
+
+* Threads should be started in **activate** and stopped in **deactivate**. In 
*deactivate* the operator should wait till any threads it launched, have 
finished execution. It can do so by calling **join** on the threads or if using 
**ExecutorService**, calling **awaitTermination** on the service.
+* Threads should not call any methods on the ports directly as this can cause 
concurrency exceptions and also result in invalid states.
+* Threads can share state with the lifecycle methods using data structures 
that are either explicitly protected by synchronization or are inherently 
thread safe such as thread safe queues.
+* If this shared state needs to be protected against failure then it needs to 
be persisted during checkpoint. To have a consistent checkpoint, the state 
should not be modified by the thread when it is being serialized and saved by 
the operator lifecycle thread during checkpoint. Since the checkpoint process 
happens outside the window boundary the thread should be quiesced between 
**endWindow** and **beginWindow** or more efficiently between pre-checkpoint 
and checkpointed callbacks.

http://git-wip-us.apache.org/repos/asf/apex-core/blob/70ceae3d/mkdocs.yml
----------------------------------------------------------------------
diff --git a/mkdocs.yml b/mkdocs.yml
index 8c3904c..c10f352 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -9,6 +9,7 @@ pages:
     - Packages: application_packages.md
     - Operators: operator_development.md
     - AutoMetric API: autometrics.md
+    - Best Practices: development_best_practices.md 
 - Operations:
     - Apex CLI : apex_cli.md
     - Security: security.md

Reply via email to