[ 
https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Chen updated EAGLE-66:
--------------------------
    Description: 
Main Features

1. Typesafe API: Currently the stream processing API is not type-safe (Type 
info are erased by stream framework), all programming interfaces for developer 
are faced to Object/AnyRef, which is not very friendly and extensible for 
framework user (application developer):

public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, 
AlertAPIEntity>> outputCollector)

So i propose the interface as (all type information are transparent for 
developer, not need additional parameters, supported by Scala implicit TypeTag)

class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T]
StreamInfo:  contains Stream core information including streamId, processing 
element id/name, entity type info (class/TypeTag)
StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API and 
java-compatible API
class StreamInfo  extends Serializable{
  val id:Int = UniqueId.incrementAndGetId()
  var name: String = null
  var streamId:String=null
  var parallelismNum: Int = 1
  var inKeyed:Boolean = false
  var outKeyed:Boolean = false
  var keySelector:KeySelector = null

  var typeClass:Class[_] = classOf[AnyRef]
  @transient  implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
}

2. KeyValue Based Structure: currently framework user (developer) have to 
handle with field declaration again and again, and framework and business logic 
are highly coupled, according to the StreamProtocol description, user should 
not care about framework level detail like internal data structure for Storm 
using List<Object> with Fields<String> which is not friendly for developer, we 
should make sure user focus on business logic only like:

env.from(tuples)
        .groupByKey(_.name)

3. Spout grouping instead of overriding Schema: currently especially in use 
case like HdfsAuditLog Monitoring, if developer wants to  groupby certain key, 
they are forced to override Schema (specific for storm) , which is not good and 
un-reusable.

4. Environment Decoupled: currently the stream (metadata) /dag (logic) / 
environment (execution) are coupled with storm internal implementation, which 
is not good for becoming a metadata-driven pipeline framework in future, so we 
should decouple it.
 
5. Compatible with Field-based Structure in old framework and application.

Sample Application

case class Entity(name:String,value:Double,var inc:Int=0)

val tuples = Seq(
        Entity("a", 1),
        Entity("a", 2),
        Entity("a", 3),
        Entity("b", 2),
        Entity("c", 3),
        Entity("d", 3)
)

val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)


// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 

env.from(tuples)
        .groupByKey(_.name)
        .map(o => {o.inc += 2;o})
        .filter(_.name != "b")
        .filter(_.name != "c")
        .groupByKey(o=>(o.name,o.value))
        .map(o => (o.name,o))
        .map(o => (o._1,o._2.value,o._2.inc))
        .foreach(println)

 env.execute()


Type is transparent for developer during both DAG compiling (programming) and 
runtime (metadata) phases

2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded 
DAG 
{ 
        IterableStreamProducer[Entity]_1{1} -> 
GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
        GroupByKeyProducer[<function1>(Entity)]_2{1} -> 
MapProducer[Entity]_3{1} in shuffleGroup
        MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
        FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
shuffleGroup
        FilterProducer[Entity]_5{1} -> 
GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
        GroupByKeyProducer[<function1>(Entity)]_6{1} -> 
MapProducer[Tuple2]_7{1} in shuffleGroup
        MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
        MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded DAG 
{ 
        IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in 
groupByKey(<function1>)
        MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
        FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
shuffleGroup
        FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in 
groupByKey(<function1>)
        MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
        MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm 
topology DAG
{
        Spout[IterableStreamProducer[Entity]_1]{1} -> 
Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
        Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in 
shuffleGroup
        Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} 
in shuffleGroup
        Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in 
groupByKey(<function1>)
        Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in 
shuffleGroup
        Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in 
shuffleGroup 
}

  was:
Firstly, as to application developer space, developer should not need to care 
about exchanging internal data structure between processing elements, so that 
it should allow user to process the stream just like processing a collection of 
typed object, for example in following case, we should allow to:
In type-safe way: groupBy(_.user)
In field-base way: groupBy("user”)

@StreamDef("hdfsAuditLogStream")
case class AuditLog(timestamp:Long,user:String,path:String,cmd:String)

object TypedSafeApp extends App{
  val ec = ExecutionContext.get[StormExecutionContext]
  ec.fromKafka(ec.getConfig){ bytes => 
AuditLog(1000,"someone","/tmp/private","open")}.parallism(123)// 
Stream[AuditLog]
    .groupBy(_.user).parallism(12) 
//  Stream[AuditLog]
    .map { obj => (obj.user,obj)}//  Stream[(String, AuditLog)]
    .groupBy(_._1)// Stream[(String, AuditLog)]
    .map(_._2)// Stream[AuditLog]
    .alert.persistAndEmail// Stream[AlertAPIEntity]
}

As to framework space, especially for groupBy semantics, it should look like 
nothing different as to developer, but in internal space, the exchange 
structure is (AnyRef, AuditLog) 
As to groupBy(_.user) and object:  obj in type of AuditLog, it’s always 
(obj.user,obj) + fieldsGrouping  by 0 for storm
As to groupBy(“user") and object:  obj in type of AuditLog, it’s 
(readValueByReflect(“user”,obj),obj) + fieldsGrouping by 0 for storm


> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
>                 Key: EAGLE-66
>                 URL: https://issues.apache.org/jira/browse/EAGLE-66
>             Project: Eagle
>          Issue Type: Improvement
>    Affects Versions: 0.3.0
>            Reporter: Hao Chen
>            Assignee: Hao Chen
>             Fix For: 0.3.0
>
>
> Main Features
> 1. Typesafe API: Currently the stream processing API is not type-safe (Type 
> info are erased by stream framework), all programming interfaces for 
> developer are faced to Object/AnyRef, which is not very friendly and 
> extensible for framework user (application developer):
> public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, 
> AlertAPIEntity>> outputCollector)
> So i propose the interface as (all type information are transparent for 
> developer, not need additional parameters, supported by Scala implicit 
> TypeTag)
> class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T]
> StreamInfo:  contains Stream core information including streamId, processing 
> element id/name, entity type info (class/TypeTag)
> StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API and 
> java-compatible API
> class StreamInfo  extends Serializable{
>   val id:Int = UniqueId.incrementAndGetId()
>   var name: String = null
>   var streamId:String=null
>   var parallelismNum: Int = 1
>   var inKeyed:Boolean = false
>   var outKeyed:Boolean = false
>   var keySelector:KeySelector = null
>   var typeClass:Class[_] = classOf[AnyRef]
>   @transient  implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
> }
> 2. KeyValue Based Structure: currently framework user (developer) have to 
> handle with field declaration again and again, and framework and business 
> logic are highly coupled, according to the StreamProtocol description, user 
> should not care about framework level detail like internal data structure for 
> Storm using List<Object> with Fields<String> which is not friendly for 
> developer, we should make sure user focus on business logic only like:
> env.from(tuples)
>       .groupByKey(_.name)
> 3. Spout grouping instead of overriding Schema: currently especially in use 
> case like HdfsAuditLog Monitoring, if developer wants to  groupby certain 
> key, they are forced to override Schema (specific for storm) , which is not 
> good and un-reusable.
> 4. Environment Decoupled: currently the stream (metadata) /dag (logic) / 
> environment (execution) are coupled with storm internal implementation, which 
> is not good for becoming a metadata-driven pipeline framework in future, so 
> we should decouple it.
>  
> 5. Compatible with Field-based Structure in old framework and application.
> Sample Application
> case class Entity(name:String,value:Double,var inc:Int=0)
> val tuples = Seq(
>       Entity("a", 1),
>       Entity("a", 2),
>       Entity("a", 3),
>       Entity("b", 2),
>       Entity("c", 3),
>       Entity("d", 3)
> )
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
> // DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 
> env.from(tuples)
>       .groupByKey(_.name)
>       .map(o => {o.inc += 2;o})
>       .filter(_.name != "b")
>       .filter(_.name != "c")
>       .groupByKey(o=>(o.name,o.value))
>       .map(o => (o.name,o))
>       .map(o => (o._1,o._2.value,o._2.inc))
>       .foreach(println)
>  env.execute()
> Type is transparent for developer during both DAG compiling (programming) and 
> runtime (metadata) phases
> 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded 
> DAG 
> { 
>       IterableStreamProducer[Entity]_1{1} -> 
> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
>       GroupByKeyProducer[<function1>(Entity)]_2{1} -> 
> MapProducer[Entity]_3{1} in shuffleGroup
>       MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>       FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
> shuffleGroup
>       FilterProducer[Entity]_5{1} -> 
> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
>       GroupByKeyProducer[<function1>(Entity)]_6{1} -> 
> MapProducer[Tuple2]_7{1} in shuffleGroup
>       MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>       MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded 
> DAG 
> { 
>       IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in 
> groupByKey(<function1>)
>       MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>       FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
> shuffleGroup
>       FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in 
> groupByKey(<function1>)
>       MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>       MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm 
> topology DAG
> {
>       Spout[IterableStreamProducer[Entity]_1]{1} -> 
> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
>       Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in 
> shuffleGroup
>       Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} 
> in shuffleGroup
>       Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in 
> groupByKey(<function1>)
>       Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in 
> shuffleGroup
>       Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in 
> shuffleGroup 
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to