This is a nice progress. good to see we have type safe fluent API. Could you please explain a little bit more how typesafe would help generic purpose monitoring?
In general purpose monitoring, do we need input type/output type defined? How user describe their types from source to each processing element, and how framework interpret user declared type into EAGLE internal type? Thanks Edward On 12/6/15, 23:47, "Hao Chen" <[email protected]> wrote: >JIRA: https://issues.apache.org/jira/browse/EAGLE-66 >Pull Request: https://github.com/apache/incubator-eagle/pull/17 > >*Main Changes* > >1. *Typesafe API:* Currently the stream processing API is not type-safe >(Type info are erased by stream framework), all programming interfaces for >developer are faced to Object/AnyRef, which is not very friendly and >extensible for framework user (application developer): > >public void flatMap(java.util.List<*Object*> input, >Collector<Tuple2<String, AlertAPIEntity>> outputCollector) > >So i propose the interface as (all type information are transparent for >developer, not need additional parameters, supported by Scala implicit >TypeTag) > >*class StreamProducer[+T <: Any] extends StreamInfo with >StreamProtocol[T]* > > - *StreamInfo: * contains Stream core information including streamId, > processing element id/name, entity type info (class/TypeTag) > - *StreamProtocol extends JavaStreamProtocol*: contains basic Stream >DSL > API and java-compatible API > >class *StreamInfo* extends Serializable{ > val id:Int = UniqueId.incrementAndGetId() > var name: String = null > var streamId:String=null > var parallelismNum: Int = 1 > var inKeyed:Boolean = false > var outKeyed:Boolean = false > var keySelector:KeySelector = null > > var typeClass:Class[_] = classOf[AnyRef] > @transient implicit var *typeTag:ru.TypeTag[*_] = ru.typeTag[AnyRef] >} > >2. *KeyValue Based Structure: *currently framework user (developer) have >to >handle with field declaration again and again, and framework and business >logic are highly coupled, according to the StreamProtocol description, >user >should not care about framework level detail like internal data structure >for Storm using List<Object> with Fields<String> which is not friendly for >developer, we should make sure user focus on business logic only like: > >env.from(tuples) >.groupByKey(_.name) > >3. *Spout grouping instead of overriding Schema*: currently especially in >use case like HdfsAuditLog Monitoring, if developer wants to groupby >certain key, they are forced to override Schema (specific for storm) , >which is not good and un-reusable. > >4. *Environment Decoupled: *currently the stream (metadata) /dag (logic) / >environment (execution) are coupled with storm internal implementation, >which is not good for becoming a metadata-driven pipeline framework in >future, so we should decouple it. > >*5. Compatible with Field-based Structure in old framework and >application.* > >*Sample Application* > >case class Entity(name:String,value:Double,var inc:Int=0) > >val tuples = Seq( >Entity("a", 1), >Entity("a", 2), >Entity("a", 3), >Entity("b", 2), >Entity("c", 3), >Entity("d", 3) >) > >val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) > > >*// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 * > >env.from(tuples) >*.groupByKey(_.name)* >.map(o => {o.inc += 2;o}) >.filter(_.name != "b") >.filter(_.name != "c") >*.groupByKey(o=>(o.name <http://o.name>,o.value))* >.map(o => (o.name,o)) >.map(o => (o._1,o._2.value,o._2.inc)) >.foreach(println) > > env.execute() > > >*Type is transparent for developer during both DAG compiling (programming) >and runtime (metadata) phases* > >2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before >expanded DAG >{ >IterableStreamProducer[*Entity*]_1{1} -> >GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup >GroupByKeyProducer[<function1>(*Entity*)]_2{1} -> MapProducer[Entity]_3{1} >in shuffleGroup >MapProducer[*Entity]*_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup >FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in >shuffleGroup >FilterProducer[*Entity*]_5{1} -> >GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup >GroupByKeyProducer[<function1>(*Entity*)]_6{1} -> MapProducer[Tuple2]_7{1} >in shuffleGroup >MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup >MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup >} >2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After >expanded >DAG >{ >IterableStreamProducer[*Entity*]_1{1} -> MapProducer[Entity]_3{1} in >groupByKey(<function1>) >MapProducer[*Entity*]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup >FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in >shuffleGroup >FilterProducer[*Entity*]_5{1} -> MapProducer[Tuple2]_7{1} in >groupByKey(<function1>) >MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup >MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup >} >2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: >Storm >topology DAG >{ > Spout[IterableStreamProducer[*Entity*]_1]{1} -> >Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>) >Bolt[MapProducer[*Entity*]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in >shuffleGroup >Bolt[FilterProducer[*Entity*]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} >in shuffleGroup >Bolt[FilterProducer[*Entity*]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in >groupByKey(<function1>) >Bolt[MapProducer[*Tuple2*]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in >shuffleGroup >Bolt[MapProducer[*Tuple3*]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in >shuffleGroup >} > > >Regards, >Hao
