[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15293594#comment-15293594
 ] 

Siyuan Hua commented on APEXMALHAR-1939:
----------------------------------------

h1. First iteration of Java Stream API. 
Java Stream API is following the popular functional programming paradigm to 
construct an Apex Application.
The goal for this API is:

-  Easy to construct a dag
-  Easy to migrate other streaming application to Apex
-  Fully compatible with existing DAG API
-  Provide useful build-in transformations with abstracted pluggable components 
in one place

To achieve the goal and split the work, we categorize all different kind of 
transformations into 2 different types: 
- 1 input, 1+ output (map, filter, flatmap); 
- Multiple input, 1 output (Aggregations, Joins, Unions)

This first iteration is only about the first category, which is,  1 in, 1+ out. 
For transformations like this, it is just like distributed function call. So we 
abstract out some function types instead of operators. Internally, there are 
some pre-build function operators which wrap the function and connect together. 

The core interface is the ApexStream. The ApexStream is designed in a method 
chain fashion, which all transformation method returns a new ApexStream object 
with new output type.  

Here are some examples, if you want to do a filter then a map, you can do 
{code:java}
  stream.filter(new FilterFunction())
    .map(new MapFunction()) 
{code}

You can also mix this with existing operator API. For example, if you want to 
add a operator after map, you can do this 
{code}
  stream.filter(..)
    .map(..)
    .addOperator(opt, opt.input, opt.output)
// the opt.input here is to connect to the output of last stream and opt.output 
is going to be connected to the next)
{code}
If you want to set the locality or attributes for operator/ports/dag, you can 
use **with** clause, for example you want filter and map to be container local 
and you want to set checkpoint window count for the new operator you just 
added, you can do something like this
{code}
  stream.filter(..)
    .map(..).with(Locality.CONTAINER_LOCAL)
    .addOperator(..).with(OperatorContext.CHECKPOINT_WINDOW_COUNT, 5)
    .with(someProp, someVal)
//(ps:engine will figure out which operator/ports/dag this attribute applies to)
{code}
Like the dag API, you can run the stream in a distributed mode or local mode, 
For example,
{code}
stream...populateDag(dag) //distributed mode
stream...runEmbedded(...) //local mode
{code}
The stream is implemented in a lazy build mode, which means until you call 
populateDag or runEmbedded, all the transformations and the order of them will 
be kept in memory in a graph data structure (*DagMeta*).  This will allow us to 
solve some technical difficulties such as logical plan optimization etc. 

Also the stream is flexible to extend to fit you needs in your organization. 
For example if you want to provide a filter and map transformation in one 
operator. Instead of repeating the work of connect filter and map operator 
together in a thread_local mode. You can add first-order function to 
*ApexStream* interface, by simply extending the default implementation 
*ApexStreamImpl* 

{code:title=MyStream.java}
public class MyStream<T> extends ApexStreamImpl<T>
{
  public MyStream(ApexStream<T> apexStream)
  {
    super(apexStream);
  }

  <O> MyStream<O> myFilterAndMap(Function.MapFunction<T, O> map, 
Function.FilterFunction<T> filterFunction)
  {
    return filter(filterFunction).map(map).with(DAG.Locality.THREAD_LOCAL);
  }

}

{code}

Then you can use your new Stream like this
{code}
new MyStream(stream).<..,MyStream>.flatMap(...)   // existing build-in 
transformation
  .*myFilterAndMap(...)*   // your transformation for your org
  .print()
{code}





> Stream API
> ----------
>
>                 Key: APEXMALHAR-1939
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1939
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Siyuan Hua
>            Priority: Critical
>              Labels: roadmap
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to